This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push:
new 08b7d66 [FLINK-38721] Support vector search for es connector. (#137)
08b7d66 is described below
commit 08b7d661c631e8d8cc3b08785dcc37be2e7aca36
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue May 19 19:46:51 2026 +0800
[FLINK-38721] Support vector search for es connector. (#137)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.github/workflows/push_pr.yml | 2 +-
.../table/ElasticsearchDynamicSource.java | 44 +-
.../ElasticsearchDynamicTableFactoryBase.java | 2 +-
.../AbstractElasticsearchVectorSearchFunction.java | 151 +++++++
.../elasticsearch/table/search/SearchMetric.java | 39 ++
.../table/search/VectorSearchUtils.java | 78 ++++
.../connector/elasticsearch/ElasticsearchUtil.java | 16 +-
flink-connector-elasticsearch7/pom.xml | 8 +
...ctory.java => Elasticsearch7Configuration.java} | 28 +-
.../table/Elasticsearch7ConnectorOptions.java | 48 +++
.../table/Elasticsearch7DynamicSource.java | 101 +++++
.../table/Elasticsearch7DynamicTableFactory.java | 101 ++++-
.../ElasticsearchRowDataVectorSearchFunction.java | 124 ++++++
.../table/Elasticsearch7VectorSearchITCase.java | 347 +++++++++++++++
.../src/test/resources/testcontainers.properties | 17 +
flink-connector-elasticsearch8/pom.xml | 40 +-
.../elasticsearch/sink/NetworkConfig.java | 6 +
.../table/Elasticsearch8Configuration.java | 12 +
.../table/Elasticsearch8ConnectorOptions.java | 17 +
.../table/Elasticsearch8DynamicSource.java | 145 +++++++
...java => Elasticsearch8DynamicTableFactory.java} | 140 +++---
.../ElasticsearchRowDataVectorSearchFunction.java | 101 +++++
.../org.apache.flink.table.factories.Factory | 3 +-
.../table/Elasticsearch8DynamicSinkITCase.java | 3 +-
.../table/Elasticsearch8VectorSearchITCase.java | 473 +++++++++++++++++++++
.../src/test/resources/testcontainers.properties | 17 +
pom.xml | 1 +
27 files changed, 1955 insertions(+), 109 deletions(-)
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index a21e7eb..676d1c0 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,7 +28,7 @@ jobs:
compile_and_test:
strategy:
matrix:
- flink: [ 2.2.1, 2.1.2 ]
+ flink: [ 2.2.1 ]
jdk: [ '11', '17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
index 83a95f9..5250b4b 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.connector.elasticsearch.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -24,20 +42,20 @@ import javax.annotation.Nullable;
* from a logical description.
*/
public class ElasticsearchDynamicSource implements LookupTableSource,
SupportsProjectionPushDown {
- private final DecodingFormat<DeserializationSchema<RowData>> format;
- private final ElasticsearchConfiguration config;
- private final int lookupMaxRetryTimes;
- private final LookupCache lookupCache;
- private final String docType;
- private final String summaryString;
- private final ElasticsearchApiCallBridge<?> apiCallBridge;
- private DataType physicalRowDataType;
+ protected final DecodingFormat<DeserializationSchema<RowData>> format;
+ protected final ElasticsearchConfiguration config;
+ protected final int maxRetryTimes;
+ protected final LookupCache lookupCache;
+ protected final String docType;
+ protected final String summaryString;
+ protected final ElasticsearchApiCallBridge<?> apiCallBridge;
+ protected DataType physicalRowDataType;
public ElasticsearchDynamicSource(
DecodingFormat<DeserializationSchema<RowData>> format,
ElasticsearchConfiguration config,
DataType physicalRowDataType,
- int lookupMaxRetryTimes,
+ int maxRetryTimes,
String summaryString,
ElasticsearchApiCallBridge<?> apiCallBridge,
@Nullable LookupCache lookupCache,
@@ -45,7 +63,7 @@ public class ElasticsearchDynamicSource implements
LookupTableSource, SupportsPr
this.format = format;
this.config = config;
this.physicalRowDataType = physicalRowDataType;
- this.lookupMaxRetryTimes = lookupMaxRetryTimes;
+ this.maxRetryTimes = maxRetryTimes;
this.summaryString = summaryString;
this.apiCallBridge = apiCallBridge;
this.lookupCache = lookupCache;
@@ -68,7 +86,7 @@ public class ElasticsearchDynamicSource implements
LookupTableSource, SupportsPr
ElasticsearchRowDataLookupFunction<?> lookupFunction =
new ElasticsearchRowDataLookupFunction<>(
this.format.createRuntimeDecoder(context,
physicalRowDataType),
- lookupMaxRetryTimes,
+ maxRetryTimes,
config.getIndex(),
docType,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
@@ -84,7 +102,7 @@ public class ElasticsearchDynamicSource implements
LookupTableSource, SupportsPr
}
}
- private NetworkClientConfig buildNetworkClientConfig() {
+ protected NetworkClientConfig buildNetworkClientConfig() {
NetworkClientConfig.Builder builder = new
NetworkClientConfig.Builder();
if (config.getUsername().isPresent()
&&
!StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
@@ -123,7 +141,7 @@ public class ElasticsearchDynamicSource implements
LookupTableSource, SupportsPr
format,
config,
physicalRowDataType,
- lookupMaxRetryTimes,
+ maxRetryTimes,
summaryString,
apiCallBridge,
lookupCache,
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
index f223380..8a72893 100644
---
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java
@@ -165,7 +165,7 @@ abstract class ElasticsearchDynamicTableFactoryBase
}
@Nullable
- private LookupCache getLookupCache(ReadableConfig tableOptions) {
+ protected LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
new file mode 100644
index 0000000..1478307
--- /dev/null
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base {@link VectorSearchFunction} implementation for Elasticsearch. Shared
retry loop, result
+ * decoding and null-source filtering live here; version-specific subclasses
only need to provide
+ * the client initialization and the search call.
+ */
+public abstract class AbstractElasticsearchVectorSearchFunction extends
VectorSearchFunction {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AbstractElasticsearchVectorSearchFunction.class);
+ private static final long serialVersionUID = 1L;
+
+ protected final DeserializationSchema<RowData> deserializationSchema;
+ protected final String index;
+ protected final String searchColumn;
+ protected final String[] producedNames;
+ protected final int maxRetryTimes;
+
+ protected AbstractElasticsearchVectorSearchFunction(
+ DeserializationSchema<RowData> deserializationSchema,
+ int maxRetryTimes,
+ String index,
+ String searchColumn,
+ String[] producedNames) {
+ this.deserializationSchema =
+ checkNotNull(deserializationSchema, "No DeserializationSchema
supplied.");
+ this.producedNames = checkNotNull(producedNames, "No fieldNames
supplied.");
+ this.maxRetryTimes = maxRetryTimes;
+ this.index = index;
+ this.searchColumn = searchColumn;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ doOpen(context);
+ deserializationSchema.open(null);
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ doClose();
+ } finally {
+ super.close();
+ }
+ }
+
+ @Override
+ public Collection<RowData> vectorSearch(int topK, RowData features) throws
IOException {
+ for (int retry = 0; retry <= maxRetryTimes; retry++) {
+ try {
+ SearchResult[] results = doSearch(topK, features);
+ if (results.length > 0) {
+ ArrayList<RowData> rows = new ArrayList<>(results.length);
+ for (SearchResult result : results) {
+ if (result.source == null) {
+ continue;
+ }
+ RowData row = parseSearchResult(result.source);
+ if (row == null) {
+ continue;
+ }
+ GenericRowData scoreData = new GenericRowData(1);
+ scoreData.setField(0, result.score);
+ rows.add(new JoinedRowData(row, scoreData));
+ }
+ rows.trimToSize();
+ return rows;
+ }
+ } catch (IOException e) {
+ LOG.error(String.format("Elasticsearch search error, retry
times = %d", retry), e);
+ if (retry >= maxRetryTimes) {
+ throw new FlinkRuntimeException("Execution of
Elasticsearch search failed.", e);
+ }
+ try {
+ Thread.sleep(1000L * retry);
+ } catch (InterruptedException e1) {
+ LOG.warn(
+ "Interrupted while waiting to retry failed
elasticsearch search, aborting");
+ throw new FlinkRuntimeException(e1);
+ }
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ /** Version-specific initialization (e.g., creating the underlying
Elasticsearch client). */
+ protected abstract void doOpen(FunctionContext context) throws Exception;
+
+ /** Version-specific resource release (e.g., closing the underlying
Elasticsearch client). */
+ protected abstract void doClose() throws Exception;
+
+ /** Execute a single vector search call and return raw results, excluding
nothing. */
+ protected abstract SearchResult[] doSearch(int topK, RowData features)
throws IOException;
+
+ private RowData parseSearchResult(String result) {
+ try {
+ return deserializationSchema.deserialize(result.getBytes());
+ } catch (IOException e) {
+ LOG.error("Deserialize search hit failed: " + e.getMessage());
+ return null;
+ }
+ }
+
+ /** One hit from Elasticsearch — raw JSON source plus score. */
+ protected static class SearchResult {
+ final String source;
+ final Double score;
+
+ public SearchResult(String source, Double score) {
+ this.source = source;
+ this.score = score;
+ }
+ }
+}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
new file mode 100644
index 0000000..5e42f7c
--- /dev/null
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+/** Metric for vector search. */
+public enum SearchMetric {
+ COSINE_SIMILARITY("cosineSimilarity"),
+ L1NORM("l1norm"),
+ L2NORM("l2norm"),
+ HAMMING("hamming"),
+ DOT_PRODUCT("dotProduct");
+
+ private final String name;
+
+ SearchMetric(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
new file mode 100644
index 0000000..54fbbaf
--- /dev/null
+++
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Shared helpers for the Elasticsearch vector search table sources. */
+public class VectorSearchUtils {
+
+ private VectorSearchUtils() {}
+
+ /**
+ * Validates the search columns declared on the given context and returns
the resolved physical
+ * column name. Elasticsearch only supports a single, non-nested
float-array column.
+ */
+ public static String resolveSearchColumn(
+ DataType physicalRowDataType,
+ VectorSearchTableSource.VectorSearchContext vectorSearchContext) {
+ int[][] searchColumns = vectorSearchContext.getSearchColumns();
+
+ if (searchColumns.length != 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Elasticsearch only supports one search columns
now, but input search columns size is %d.",
+ searchColumns.length));
+ }
+ int[] searchColumn = searchColumns[0];
+ if (searchColumn.length != 1) {
+ throw new IllegalArgumentException(
+ "Elasticsearch doesn't support to search data using nested
columns.");
+ }
+ int searchColumnIndex = searchColumn[0];
+
+ if (searchColumnIndex < 0
+ || searchColumnIndex >=
physicalRowDataType.getChildren().size()) {
+ throw new ValidationException(
+ String.format(
+ "The specified search column with index %d doesn't
exist in schema.",
+ searchColumnIndex));
+ }
+
+ DataType searchColumnType =
physicalRowDataType.getChildren().get(searchColumnIndex);
+ if (!searchColumnType.getLogicalType().is(LogicalTypeRoot.ARRAY)
+ || !((ArrayType) searchColumnType.getLogicalType())
+ .getElementType()
+ .is(LogicalTypeRoot.FLOAT)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Elasticsearch only supports search data using
float vector now, but input search column type is %s.",
+ searchColumnType));
+ }
+
+ return ((RowType) physicalRowDataType.getLogicalType())
+ .getFieldNames()
+ .get(searchColumnIndex);
+ }
+}
diff --git
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
index f7126b7..2149bcb 100644
---
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
+++
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java
@@ -26,9 +26,11 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;
+import java.time.Duration;
import java.util.Optional;
/** Collection of utility methods for Elasticsearch tests. */
@@ -62,10 +64,16 @@ public class ElasticsearchUtil {
logLevel = "OFF";
}
- return new
ElasticsearchContainer(DockerImageName.parse(dockerImageVersion))
- .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
- .withEnv("logger.org.elasticsearch", logLevel)
- .withLogConsumer(new Slf4jLogConsumer(log));
+ ElasticsearchContainer container =
+ new
ElasticsearchContainer(DockerImageName.parse(dockerImageVersion))
+ .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+ .withEnv("logger.org.elasticsearch", logLevel)
+ .withLogConsumer(new Slf4jLogConsumer(log));
+
+ container.setWaitStrategy(
+
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(1)));
+
+ return container;
}
/** A mock {@link DynamicTableSink.Context} for Elasticsearch tests. */
diff --git a/flink-connector-elasticsearch7/pom.xml
b/flink-connector-elasticsearch7/pom.xml
index f8cbbf4..b0b8014 100644
--- a/flink-connector-elasticsearch7/pom.xml
+++ b/flink-connector-elasticsearch7/pom.xml
@@ -165,6 +165,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
diff --git
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
similarity index 50%
copy from
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
copy to
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
index 2f6d884..8b373c7 100644
---
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
+++
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java
@@ -18,23 +18,23 @@
package org.apache.flink.connector.elasticsearch.table;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge;
-import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
-import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.elasticsearch.table.search.SearchMetric;
-/** A {@link DynamicTableSinkFactory} for discovering {@link
ElasticsearchDynamicSink}. */
-@Internal
-public class Elasticsearch7DynamicTableFactory extends
ElasticsearchDynamicTableFactoryBase {
- private static final String FACTORY_IDENTIFIER = "elasticsearch-7";
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC;
- public Elasticsearch7DynamicTableFactory() {
- super(FACTORY_IDENTIFIER, Elasticsearch7SinkBuilder::new);
+/** Elasticsearch 7 specific configuration. */
+public class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+ Elasticsearch7Configuration(ReadableConfig config) {
+ super(config);
}
- @Override
- ElasticsearchApiCallBridge<?> getElasticsearchApiCallBridge() {
- return new Elasticsearch7ApiCallBridge();
+ public int getMaxRetries() {
+ return config.get(MAX_RETRIES);
+ }
+
+ public SearchMetric getVectorSearchMetric() {
+ return config.get(VECTOR_SEARCH_METRIC);
}
}
diff --git
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
new file mode 100644
index 0000000..56fbaf2
--- /dev/null
+++
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.elasticsearch.table.search.SearchMetric;
+
+/**
+ * Options specific for the Elasticsearch 7 connector. Public so that the
{@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.
+ */
+@PublicEvolving
+public class Elasticsearch7ConnectorOptions extends
ElasticsearchConnectorOptions {
+ private Elasticsearch7ConnectorOptions() {}
+
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max-retries")
+ .intType()
+ .defaultValue(3)
+ .withFallbackKeys("lookup.max-retries")
+ .withDescription(
+ "The maximum allowed retries if a lookup/search
operation fails.");
+
+ public static final ConfigOption<SearchMetric> VECTOR_SEARCH_METRIC =
+ ConfigOptions.key("vector-search.metric")
+ .enumType(SearchMetric.class)
+ .defaultValue(SearchMetric.COSINE_SIMILARITY)
+ .withDescription(
+ "The metric of vector search, by default is
cosineSimilarity.");
+}
diff --git
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
new file mode 100644
index 0000000..1f18281
--- /dev/null
+++
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link
Elasticsearch7DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource
+ implements VectorSearchTableSource {
+
+ public Elasticsearch7DynamicSource(
+ DecodingFormat<DeserializationSchema<RowData>> format,
+ ElasticsearchConfiguration config,
+ DataType physicalRowDataType,
+ int maxRetryTimes,
+ String summaryString,
+ ElasticsearchApiCallBridge<RestHighLevelClient> apiCallBridge,
+ @Nullable LookupCache lookupCache,
+ @Nullable String docType) {
+ super(
+ format,
+ config,
+ physicalRowDataType,
+ maxRetryTimes,
+ summaryString,
+ apiCallBridge,
+ lookupCache,
+ docType);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public DynamicTableSource copy() {
+ return new Elasticsearch7DynamicSource(
+ format,
+ config,
+ physicalRowDataType,
+ maxRetryTimes,
+ summaryString,
+ (ElasticsearchApiCallBridge<RestHighLevelClient>)
apiCallBridge,
+ lookupCache,
+ docType);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public VectorSearchRuntimeProvider getSearchRuntimeProvider(
+ VectorSearchContext vectorSearchContext) {
+
+ NetworkClientConfig networkClientConfig = buildNetworkClientConfig();
+
+ ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
+ new ElasticsearchRowDataVectorSearchFunction(
+ this.format.createRuntimeDecoder(vectorSearchContext,
physicalRowDataType),
+ this.maxRetryTimes,
+ ((Elasticsearch7Configuration)
config).getVectorSearchMetric(),
+ config.getIndex(),
+ VectorSearchUtils.resolveSearchColumn(
+ physicalRowDataType, vectorSearchContext),
+
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+ config.getHosts(),
+ networkClientConfig,
+ (ElasticsearchApiCallBridge<RestHighLevelClient>)
apiCallBridge);
+
+ return VectorSearchFunctionProvider.of(vectorSearchFunction);
+ }
+}
diff --git
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
index 2f6d884..755175f 100644
---
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
+++
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java
@@ -19,10 +19,49 @@
package org.apache.flink.connector.elasticsearch.table;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge;
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
+import static
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.elasticsearch.common.Strings.capitalize;
/** A {@link DynamicTableSinkFactory} for discovering {@link
ElasticsearchDynamicSink}. */
@Internal
@@ -34,7 +73,67 @@ public class Elasticsearch7DynamicTableFactory extends
ElasticsearchDynamicTable
}
@Override
- ElasticsearchApiCallBridge<?> getElasticsearchApiCallBridge() {
+ ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper
helper) {
+ return new Elasticsearch7Configuration(helper.getOptions());
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig options = helper.getOptions();
+ final DecodingFormat<DeserializationSchema<RowData>> format =
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
+
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions
+ .FORMAT_OPTION);
+
+ Elasticsearch7Configuration config = (Elasticsearch7Configuration)
getConfiguration(helper);
+ helper.validate();
+ validateConfiguration(config);
+
+ return new Elasticsearch7DynamicSource(
+ format,
+ config,
+ context.getPhysicalRowDataType(),
+ config.getMaxRetries(),
+ capitalize(FACTORY_IDENTIFIER),
+ getElasticsearchApiCallBridge(),
+ getLookupCache(options),
+ getDocumentType(config));
+ }
+
+ @Override
+ ElasticsearchApiCallBridge<RestHighLevelClient>
getElasticsearchApiCallBridge() {
return new Elasticsearch7ApiCallBridge();
}
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Stream.of(
+ KEY_DELIMITER_OPTION,
+ BULK_FLUSH_MAX_SIZE_OPTION,
+ BULK_FLUSH_MAX_ACTIONS_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
+ BULK_FLUSH_BACKOFF_TYPE_OPTION,
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+ BULK_FLUSH_BACKOFF_DELAY_OPTION,
+ CONNECTION_PATH_PREFIX_OPTION,
+ CONNECTION_REQUEST_TIMEOUT,
+ CONNECTION_TIMEOUT,
+ SOCKET_TIMEOUT,
+ FORMAT_OPTION,
+ DELIVERY_GUARANTEE_OPTION,
+ PASSWORD_OPTION,
+ USERNAME_OPTION,
+ SINK_PARALLELISM,
+ CACHE_TYPE,
+ PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
+ PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
+ PARTIAL_CACHE_MAX_ROWS,
+ PARTIAL_CACHE_CACHE_MISSING_KEY,
+ MAX_RETRIES,
+ VECTOR_SEARCH_METRIC)
+ .collect(Collectors.toSet());
+ }
}
diff --git
a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
new file mode 100644
index 0000000..66f19cb
--- /dev/null
+++
b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch 7. */
+public class ElasticsearchRowDataVectorSearchFunction
+ extends AbstractElasticsearchVectorSearchFunction {
+ private static final long serialVersionUID = 1L;
+ private static final String QUERY_VECTOR = "query_vector";
+
+ private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
+ private final NetworkClientConfig networkClientConfig;
+ private final List<HttpHost> hosts;
+ private final String scriptScore;
+
+ private transient RestHighLevelClient client;
+ private transient SearchRequest searchRequest;
+ private transient SearchSourceBuilder searchSourceBuilder;
+
+ public ElasticsearchRowDataVectorSearchFunction(
+ DeserializationSchema<RowData> deserializationSchema,
+ int maxRetryTimes,
+ SearchMetric searchMetric,
+ String index,
+ String searchColumn,
+ String[] producedNames,
+ List<HttpHost> hosts,
+ NetworkClientConfig networkClientConfig,
+ ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) {
+ super(deserializationSchema, maxRetryTimes, index, searchColumn,
producedNames);
+ this.networkClientConfig =
+ checkNotNull(networkClientConfig, "No networkClientConfig
supplied.");
+ this.hosts = checkNotNull(hosts, "No hosts supplied.");
+ this.callBridge = checkNotNull(callBridge, "No
ElasticsearchApiCallBridge supplied.");
+ this.scriptScore =
+ String.format(
+ "%s(params.%s, '%s') + 1.0",
+ searchMetric.toString(), QUERY_VECTOR, searchColumn);
+ }
+
+ @Override
+ protected void doOpen(FunctionContext context) {
+ this.client = callBridge.createClient(networkClientConfig, hosts);
+
+ // Reuse searchRequest / searchSourceBuilder across invocations to
avoid rebuilding them
+ // per record.
+ this.searchRequest = new SearchRequest(index);
+ this.searchSourceBuilder = new SearchSourceBuilder();
+ this.searchSourceBuilder.fetchSource(producedNames, null);
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+ @Override
+ protected SearchResult[] doSearch(int topK, RowData features) throws
IOException {
+ // Elasticsearch 7.x doesn't support ANN, we use script score to
achieve exact matching.
+ Map<String, Object> params =
+ Collections.singletonMap(QUERY_VECTOR,
features.getArray(0).toFloatArray());
+
+ Script script = new Script(ScriptType.INLINE, "painless", scriptScore,
params);
+ ScriptScoreQueryBuilder scriptScoreQuery =
+ new ScriptScoreQueryBuilder(new MatchAllQueryBuilder(),
script);
+
+ searchSourceBuilder.query(scriptScoreQuery).size(topK);
+ searchRequest.source(searchSourceBuilder);
+
+ SearchResponse searchResponse = client.search(searchRequest,
RequestOptions.DEFAULT);
+ SearchHit[] searchHits = searchResponse.getHits().getHits();
+
+ return Stream.of(searchHits)
+ .filter(hit -> hit.getSourceAsString() != null)
+ .map(hit -> new SearchResult(hit.getSourceAsString(), (double)
hit.getScore()))
+ .toArray(SearchResult[]::new);
+ }
+}
diff --git
a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
new file mode 100644
index 0000000..8f6c249
--- /dev/null
+++
b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
+import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code VECTOR_SEARCH } ITCase for Elasticsearch. */
+@Testcontainers
+public class Elasticsearch7VectorSearchITCase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(Elasticsearch7VectorSearchITCase.class);
+
+ private static final int PARALLELISM = 2;
+
+ @Container
+ private static final ElasticsearchContainer ES_CONTAINER =
+ ElasticsearchUtil.createElasticsearchContainer(
+ DockerImageVersions.ELASTICSEARCH_7, LOG);
+
+ String getElasticsearchHttpHostAddress() {
+ return ES_CONTAINER.getHttpHostAddress();
+ }
+
+ private RestHighLevelClient getClient() {
+ return new RestHighLevelClient(
+
RestClient.builder(HttpHost.create(getElasticsearchHttpHostAddress())));
+ }
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ private final List<Row> inputData =
+ Arrays.asList(
+ Row.of(1L, "Spark", new Float[] {5f, 12f, 13f}),
+ Row.of(2L, "Flink", new Float[] {-5f, -12f, -13f}));
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ void beforeEach() {
+ tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ }
+
+ @Test
+ public void testSearchFullTypeVectorTable() throws Exception {
+ String index = "table_with_all_supported_types";
+ createFullTypesIndex(index);
+ tEnv.executeSql(
+ "CREATE TABLE esTable ("
+ + " id BIGINT,\n"
+ + " f1 STRING,\n"
+ + " f2 BOOLEAN,\n"
+ + " f3 TINYINT,\n"
+ + " f4 SMALLINT,\n"
+ + " f5 INTEGER,\n"
+ + " f6 DATE,\n"
+ + " f7 TIMESTAMP,\n"
+ + " f8 FLOAT,\n"
+ + " f9 DOUBLE,\n"
+ + " f10 ARRAY<FLOAT>,\n"
+ + " f11 ARRAY<DOUBLE>,\n"
+ + " f12 ARRAY<INTEGER>,\n"
+ + " f13 ARRAY<BIGINT>,\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector",
"elasticsearch-7")
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s'\n",
+
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+ ES_CONTAINER.getHttpHostAddress())
+ + ")");
+
+ tEnv.fromValues(
+ row(
+ 1,
+ "ABCDE",
+ true,
+ (byte) 127,
+ (short) 257,
+ 65535,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12"),
+ 11.11f,
+ 12.22d,
+ new Float[] {11.11f, 11.12f},
+ new Double[] {12.22d, 12.22d},
+ new int[] {Integer.MIN_VALUE,
Integer.MAX_VALUE},
+ new long[] {Long.MIN_VALUE, Long.MAX_VALUE}))
+ .executeInsert("esTable")
+ .await();
+
+ // Wait for es construct index.
+ Thread.sleep(2000);
+
+ List<String> rows =
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql(
+ "WITH t(id, vector) AS (SELECT
* FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY<FLOAT>))))\n"
+ + "SELECT * FROM t,
LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n")
+ .collect())
+ .stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ assertThat(rows)
+ .isEqualTo(
+ Collections.singletonList(
+ "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257,
65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22,
12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807],
1.767361044883728]"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"cosineSimilarity", "l1norm", "l2norm", "hamming",
"dotProduct"})
+ void testSearchUsingFloatArray(String metric) throws Exception {
+ String index = "table_with_multiple_data_with_" + metric.toLowerCase();
+ createSimpleIndex(index);
+ tEnv.executeSql(
+ "CREATE TABLE es_table("
+ + " id BIGINT,"
+ + " label STRING,"
+ + " vector ARRAY<FLOAT>"
+ + ")\n WITH (\n"
+ + String.format("'%s'='%s',\n", "connector",
"elasticsearch-7")
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s'\n",
+
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+ ES_CONTAINER.getHttpHostAddress())
+ + ")");
+
+ tEnv.fromValues(
+ row(1L, "Batch", new Float[] {5f, 12f, 13f}),
+ row(2L, "Streaming", new Float[] {-5f, -12f, -13f}),
+ row(3L, "Big Data", new Float[] {1f, 1f, 0f}))
+ .executeInsert("es_table")
+ .await();
+
+ // Wait for es construct index.
+ Thread.sleep(2000);
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE src(\n"
+ + " id BIGINT PRIMARY KEY NOT ENFORCED,\n"
+ + " content STRING,\n"
+ + " index ARRAY<FLOAT>\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '%s'\n"
+ + ");\n",
+ TestValuesTableFactory.registerData(inputData)));
+ assertThat(
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql(
+ "SELECT content, label
FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector),
src.index, 2))")
+ .collect())
+ .stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .isEqualTo(
+ Arrays.asList(
+ "+I[Spark, Batch]",
+ "+I[Spark, Big Data]",
+ "+I[Flink, Streaming]",
+ "+I[Flink, Big Data]"));
+ }
+
+ private void createFullTypesIndex(String index) throws IOException {
+ XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+ mappingBuilder.startObject();
+ mappingBuilder.startObject("properties");
+
+ // id: long
+ mappingBuilder.startObject("id");
+ mappingBuilder.field("type", "long");
+ mappingBuilder.endObject();
+
+ // f1: string
+ mappingBuilder.startObject("f1");
+ mappingBuilder.field("type", "text");
+ mappingBuilder.endObject();
+
+ // f2: boolean
+ mappingBuilder.startObject("f2");
+ mappingBuilder.field("type", "boolean");
+ mappingBuilder.endObject();
+
+ // f3: tinyint
+ mappingBuilder.startObject("f3");
+ mappingBuilder.field("type", "byte");
+ mappingBuilder.endObject();
+
+ // f4: long
+ mappingBuilder.startObject("f4");
+ mappingBuilder.field("type", "short");
+ mappingBuilder.endObject();
+
+ // f5: long
+ mappingBuilder.startObject("f5");
+ mappingBuilder.field("type", "integer");
+ mappingBuilder.endObject();
+
+ // f6: date
+ mappingBuilder.startObject("f6");
+ mappingBuilder.field("type", "date");
+ mappingBuilder.endObject();
+
+ // f7: timestamp
+ mappingBuilder.startObject("f7");
+ mappingBuilder.field("type", "text");
+ mappingBuilder.endObject();
+
+ // f8: float
+ mappingBuilder.startObject("f8");
+ mappingBuilder.field("type", "float");
+ mappingBuilder.endObject();
+
+ // f9: double
+ mappingBuilder.startObject("f9");
+ mappingBuilder.field("type", "double");
+ mappingBuilder.endObject();
+
+ // f10: Array<Float>
+ mappingBuilder.startObject("f10");
+ mappingBuilder.field("type", "dense_vector");
+ mappingBuilder.field("dims", 2);
+ mappingBuilder.endObject();
+
+ // f11: Array<Double>
+ mappingBuilder.startObject("f11");
+ mappingBuilder.field("type", "dense_vector");
+ mappingBuilder.field("dims", 2);
+ mappingBuilder.endObject();
+
+ // f12: Array<Integer>
+ mappingBuilder.startObject("f12");
+ mappingBuilder.field("type", "dense_vector");
+ mappingBuilder.field("dims", 2);
+ mappingBuilder.endObject();
+
+ // f13: Array<Long>
+ mappingBuilder.startObject("f13");
+ mappingBuilder.field("type", "dense_vector");
+ mappingBuilder.field("dims", 2);
+ mappingBuilder.endObject();
+
+ mappingBuilder.endObject(); // end properties
+ mappingBuilder.endObject(); // end root
+
+ CreateIndexRequest request = new CreateIndexRequest(index);
+ request.mapping(mappingBuilder);
+
+ this.getClient().indices().create(request, RequestOptions.DEFAULT);
+ }
+
+ private void createSimpleIndex(String index) throws IOException {
+ XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+ mappingBuilder.startObject();
+ mappingBuilder.startObject("properties");
+
+ // id: long
+ mappingBuilder.startObject("id");
+ mappingBuilder.field("type", "long");
+ mappingBuilder.endObject();
+
+ // f1: string
+ mappingBuilder.startObject("label");
+ mappingBuilder.field("type", "text");
+ mappingBuilder.endObject();
+
+ // f2: float vector
+ mappingBuilder.startObject("vector");
+ mappingBuilder.field("type", "dense_vector");
+ mappingBuilder.field("dims", 3);
+ mappingBuilder.endObject();
+
+ mappingBuilder.endObject(); // end properties
+ mappingBuilder.endObject(); // end root
+
+ CreateIndexRequest request = new CreateIndexRequest(index);
+ request.mapping(mappingBuilder);
+
+ this.getClient().indices().create(request, RequestOptions.DEFAULT);
+ }
+}
diff --git
a/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties
b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties
new file mode 100644
index 0000000..07514cc
--- /dev/null
+++
b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties
@@ -0,0 +1,17 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+ryuk.container.image = testcontainers/ryuk:0.6.0
diff --git a/flink-connector-elasticsearch8/pom.xml
b/flink-connector-elasticsearch8/pom.xml
index e86af07..9e8962f 100644
--- a/flink-connector-elasticsearch8/pom.xml
+++ b/flink-connector-elasticsearch8/pom.xml
@@ -88,6 +88,23 @@ under the License.
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-elasticsearch-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- Exclude the base module elasticsearch -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ <exclusion>
+
<groupId>org.elasticsearch.client</groupId>
+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- Dependency for Elasticsearch 8.x Java Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
@@ -123,45 +140,46 @@ under the License.
<scope>test</scope>
</dependency>
- <!-- Elasticsearch table sink factory testing -->
+ <!-- Table API integration tests -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
+ <artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
- <!-- ArchUit test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-architecture-tests-test</artifactId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
+ <!-- Elasticsearch table sink factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-architecture-tests-production</artifactId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <!-- ArchUit test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
+ <artifactId>flink-architecture-tests-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-loader</artifactId>
- <version>${flink.version}</version>
+
<artifactId>flink-architecture-tests-production</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
+ <artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
index 93ecd78..34447bf 100644
---
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java
@@ -24,6 +24,7 @@ package org.apache.flink.connector.elasticsearch.sink;
import org.apache.flink.util.function.SerializableSupplier;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -108,6 +109,11 @@ public class NetworkConfig implements Serializable {
new RestClientTransport(this.getRestClient(), new
JacksonJsonpMapper(mapper)));
}
+ public ElasticsearchClient createEsSyncClient() {
+ return new ElasticsearchClient(
+ new RestClientTransport(this.getRestClient(), new
JacksonJsonpMapper()));
+ }
+
private RestClient getRestClient() {
RestClientBuilder restClientBuilder =
RestClient.builder(hosts.toArray(new HttpHost[0]))
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
index 9e3e8bf..351fa55 100644
---
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java
@@ -46,6 +46,8 @@ import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8Conne
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT;
@@ -132,6 +134,16 @@ public class Elasticsearch8Configuration {
return config.getOptional(SINK_PARALLELISM);
}
+ // --- Lookup / vector search accessors
--------------------------------------------------
+
+ public int getMaxRetries() {
+ return config.get(MAX_RETRIES);
+ }
+
+ public int getNumCandidates() {
+ return config.get(NUM_CANDIDATES);
+ }
+
/**
* Parse Hosts String to list.
*
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
index 1defeba..74a450b 100644
---
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java
@@ -149,4 +149,21 @@ public class Elasticsearch8ConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when
committing.");
+
+ // --- Lookup / vector search options
----------------------------------------------------
+
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max-retries")
+ .intType()
+ .defaultValue(3)
+ .withFallbackKeys("lookup.max-retries")
+ .withDescription(
+ "The maximum allowed retries if a lookup/search
operation fails.");
+
+ public static final ConfigOption<Integer> NUM_CANDIDATES =
+ ConfigOptions.key("vector-search.num-candidates")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The number of candidate neighbors considered for
each shard during the vector search.");
}
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
new file mode 100644
index 0000000..23c1eb6
--- /dev/null
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.sink.NetworkConfig;
+import
org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction;
+import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import co.elastic.clients.transport.TransportUtils;
+import org.apache.http.HttpHost;
+
+import javax.net.ssl.SSLContext;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a {@link
Elasticsearch8DynamicSource}
+ * from a logical description.
+ */
+public class Elasticsearch8DynamicSource
+ implements VectorSearchTableSource, SupportsProjectionPushDown {
+
+ protected final DecodingFormat<DeserializationSchema<RowData>> format;
+ protected final Elasticsearch8Configuration config;
+ private final String summaryString;
+ protected DataType physicalRowDataType;
+
+ public Elasticsearch8DynamicSource(
+ DecodingFormat<DeserializationSchema<RowData>> format,
+ Elasticsearch8Configuration config,
+ DataType physicalRowDataType,
+ String summaryString) {
+ this.format = format;
+ this.config = config;
+ this.physicalRowDataType = physicalRowDataType;
+ this.summaryString = summaryString;
+ }
+
+ @Override
+ public VectorSearchRuntimeProvider getSearchRuntimeProvider(
+ VectorSearchContext vectorSearchContext) {
+
+ ElasticsearchRowDataVectorSearchFunction vectorSearchFunction =
+ new ElasticsearchRowDataVectorSearchFunction(
+ format.createRuntimeDecoder(vectorSearchContext,
physicalRowDataType),
+ config.getMaxRetries(),
+ config.getNumCandidates(),
+ config.getIndex(),
+ VectorSearchUtils.resolveSearchColumn(
+ physicalRowDataType, vectorSearchContext),
+
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+ buildNetworkConfig());
+
+ return VectorSearchFunctionProvider.of(vectorSearchFunction);
+ }
+
+ private NetworkConfig buildNetworkConfig() {
+ List<HttpHost> hosts = config.getHosts();
+ checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
+
+ String username =
+ config.getUsername()
+ .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+ .orElse(null);
+ String password =
+ config.getPassword()
+ .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+ .orElse(null);
+ String pathPrefix =
+ config.getPathPrefix()
+ .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+ .orElse(null);
+
+ SerializableSupplier<SSLContext> sslContextSupplier =
+ config.getCertificateFingerprint()
+ .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v))
+ .<SerializableSupplier<SSLContext>>map(
+ fp -> () ->
TransportUtils.sslContextFromCaFingerprint(fp))
+ .orElse(null);
+
+ return new NetworkConfig(
+ hosts,
+ username,
+ password,
+ null,
+ pathPrefix,
+ config.getConnectionRequestTimeout().map(d -> (int)
d.toMillis()).orElse(null),
+ config.getConnectionTimeout().map(d -> (int)
d.toMillis()).orElse(null),
+ config.getSocketTimeout().map(d -> (int)
d.toMillis()).orElse(null),
+ sslContextSupplier,
+ null);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new Elasticsearch8DynamicSource(format, config,
physicalRowDataType, summaryString);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return summaryString;
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType type) {
+ this.physicalRowDataType =
Projection.of(projectedFields).project(type);
+ }
+}
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
similarity index 84%
rename from
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
rename to
flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
index 3008a14..bf21cbc 100644
---
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,20 +7,19 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.flink.connector.elasticsearch.table;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
@@ -31,9 +29,13 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
@@ -63,24 +65,49 @@ import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8Conne
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES;
+import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT;
import static
org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.USERNAME_OPTION;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
-import static
org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
import static
org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
-/** Factory for creating {@link ElasticSearch8AsyncDynamicSink} . */
+/**
+ * A factory for discovering both {@link Elasticsearch8DynamicSource} (lookup
/ vector search) and
+ * {@link ElasticSearch8AsyncDynamicSink} under the same {@code
elasticsearch-8} identifier.
+ */
@Internal
-public class ElasticSearch8AsyncDynamicTableFactory extends
AsyncDynamicTableSinkFactory {
-
+public class Elasticsearch8DynamicTableFactory extends
AsyncDynamicTableSinkFactory
+ implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "elasticsearch-8";
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ final DecodingFormat<DeserializationSchema<RowData>> format =
+
helper.discoverDecodingFormat(DeserializationFormatFactory.class,
FORMAT_OPTION);
+
+ Elasticsearch8Configuration config = getConfiguration(helper);
+ helper.validate();
+ validateConfiguration(config);
+
+ return new Elasticsearch8DynamicSource(
+ format, config, context.getPhysicalRowDataType(),
capitalize(IDENTIFIER));
+ }
+
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
@@ -95,10 +122,8 @@ public class ElasticSearch8AsyncDynamicTableFactory extends
AsyncDynamicTableSin
helper.validate();
validateConfiguration(config);
- ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder
builder =
- new
ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder();
-
- return builder.setConfig(config)
+ return new
ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder()
+ .setConfig(config)
.setFormat(format)
.setPrimaryKeyLogicalTypesWithIndex(primaryKeyLogicalTypesWithIndex)
.setPhysicalRowDataType(context.getPhysicalRowDataType())
@@ -107,40 +132,6 @@ public class ElasticSearch8AsyncDynamicTableFactory
extends AsyncDynamicTableSin
.build();
}
- ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
- final String zone =
readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
-
- return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
- ? ZoneId.systemDefault()
- : ZoneId.of(zone);
- }
-
- List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context
context) {
- DataType physicalRowDataType = context.getPhysicalRowDataType();
- int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
- if (primaryKeyIndexes.length != 0) {
- DataType pkDataType =
Projection.of(primaryKeyIndexes).project(physicalRowDataType);
-
- ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
- }
-
- ResolvedSchema resolvedSchema =
context.getCatalogTable().getResolvedSchema();
- return Arrays.stream(primaryKeyIndexes)
- .mapToObj(
- index -> {
- Optional<Column> column =
resolvedSchema.getColumn(index);
- if (!column.isPresent()) {
- throw new IllegalStateException(
- String.format(
- "No primary key column found
with index '%s'.",
- index));
- }
- LogicalType logicalType =
column.get().getDataType().getLogicalType();
- return new LogicalTypeWithIndex(index,
logicalType);
- })
- .collect(Collectors.toList());
- }
-
Elasticsearch8Configuration
getConfiguration(FactoryUtil.TableFactoryHelper helper) {
return new Elasticsearch8Configuration(helper.getOptions());
}
@@ -187,9 +178,37 @@ public class ElasticSearch8AsyncDynamicTableFactory
extends AsyncDynamicTableSin
}
}
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
+ ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) {
+ final String zone =
readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+
+ return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+ ? ZoneId.systemDefault()
+ : ZoneId.of(zone);
+ }
+
+ List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context
context) {
+ DataType physicalRowDataType = context.getPhysicalRowDataType();
+ int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
+ if (primaryKeyIndexes.length != 0) {
+ DataType pkDataType =
Projection.of(primaryKeyIndexes).project(physicalRowDataType);
+ ElasticsearchValidationUtils.validatePrimaryKey(pkDataType);
+ }
+
+ ResolvedSchema resolvedSchema =
context.getCatalogTable().getResolvedSchema();
+ return Arrays.stream(primaryKeyIndexes)
+ .mapToObj(
+ index -> {
+ Optional<Column> column =
resolvedSchema.getColumn(index);
+ if (!column.isPresent()) {
+ throw new IllegalStateException(
+ String.format(
+ "No primary key column found
with index '%s'.",
+ index));
+ }
+ LogicalType logicalType =
column.get().getDataType().getLogicalType();
+ return new LogicalTypeWithIndex(index,
logicalType);
+ })
+ .collect(Collectors.toList());
}
@Override
@@ -201,16 +220,15 @@ public class ElasticSearch8AsyncDynamicTableFactory
extends AsyncDynamicTableSin
public Set<ConfigOption<?>> optionalOptions() {
return Stream.of(
KEY_DELIMITER_OPTION,
- BULK_FLUSH_MAX_SIZE_OPTION,
BULK_FLUSH_MAX_ACTIONS_OPTION,
- BULK_FLUSH_INTERVAL_OPTION,
BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION,
BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION,
+ BULK_FLUSH_MAX_SIZE_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
CONNECTION_PATH_PREFIX_OPTION,
CONNECTION_REQUEST_TIMEOUT,
CONNECTION_TIMEOUT,
SOCKET_TIMEOUT,
- SSL_CERTIFICATE_FINGERPRINT,
FORMAT_OPTION,
DELIVERY_GUARANTEE_OPTION,
PASSWORD_OPTION,
@@ -221,7 +239,9 @@ public class ElasticSearch8AsyncDynamicTableFactory extends
AsyncDynamicTableSin
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
- MAX_RETRIES)
+ MAX_RETRIES,
+ NUM_CANDIDATES,
+ SSL_CERTIFICATE_FINGERPRINT)
.collect(Collectors.toSet());
}
@@ -234,10 +254,10 @@ public class ElasticSearch8AsyncDynamicTableFactory
extends AsyncDynamicTableSin
USERNAME_OPTION,
KEY_DELIMITER_OPTION,
BULK_FLUSH_MAX_ACTIONS_OPTION,
- BULK_FLUSH_MAX_SIZE_OPTION,
- BULK_FLUSH_INTERVAL_OPTION,
BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION,
BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION,
+ BULK_FLUSH_MAX_SIZE_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
CONNECTION_PATH_PREFIX_OPTION,
CONNECTION_REQUEST_TIMEOUT,
CONNECTION_TIMEOUT,
diff --git
a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
new file mode 100644
index 0000000..516752a
--- /dev/null
+++
b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.elasticsearch.sink.NetworkConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.json.JsonData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch 8. */
+public class ElasticsearchRowDataVectorSearchFunction
+ extends AbstractElasticsearchVectorSearchFunction {
+ private static final long serialVersionUID = 1L;
+
+ private final int numCandidates;
+ private final NetworkConfig networkConfig;
+
+ private transient ElasticsearchClient client;
+
+ public ElasticsearchRowDataVectorSearchFunction(
+ DeserializationSchema<RowData> deserializationSchema,
+ int maxRetryTimes,
+ int numCandidates,
+ String index,
+ String searchColumn,
+ String[] producedNames,
+ NetworkConfig networkConfig) {
+ super(deserializationSchema, maxRetryTimes, index, searchColumn,
producedNames);
+ this.numCandidates = numCandidates;
+ this.networkConfig = checkNotNull(networkConfig, "No networkConfig
supplied.");
+ }
+
+ @Override
+ protected void doOpen(FunctionContext context) {
+ this.client = networkConfig.createEsSyncClient();
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ if (client != null) {
+ client._transport().close();
+ client = null;
+ }
+ }
+
+ @Override
+ protected SearchResult[] doSearch(int topK, RowData features) throws
IOException {
+ List<Float> queryVector = new ArrayList<>();
+ for (float feature : features.getArray(0).toFloatArray()) {
+ queryVector.add(feature);
+ }
+
+ SearchRequest request =
+ new SearchRequest.Builder()
+ .index(index)
+ .knn(
+ kb ->
+ kb.field(searchColumn)
+ .numCandidates(numCandidates)
+ .queryVector(queryVector)
+ .k(topK))
+ .source(src -> src.filter(f ->
f.includes(Arrays.asList(producedNames))))
+ .build();
+
+ SearchResponse<JsonData> searchResponse = client.search(request,
JsonData.class);
+
+ return searchResponse.hits().hits().stream()
+ .filter(hit -> hit.source() != null)
+ .map(hit -> new SearchResult(hit.source().toJson().toString(),
hit.score()))
+ .toArray(SearchResult[]::new);
+ }
+}
diff --git
a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index de87735..b138912 100644
---
a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.connector.elasticsearch.table.ElasticSearch8AsyncDynamicTableFactory
-
+org.apache.flink.connector.elasticsearch.table.Elasticsearch8DynamicTableFactory
diff --git
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
index 2381b88..f65e72a 100644
---
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
+++
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java
@@ -94,8 +94,7 @@ class Elasticsearch8DynamicSinkITCase extends
Elasticsearch8DynamicSinkBaseITCas
LocalDateTime.parse("2012-12-12T12:12:12")));
String index = "writing-documents";
- ElasticSearch8AsyncDynamicTableFactory sinkFactory =
- new ElasticSearch8AsyncDynamicTableFactory();
+ Elasticsearch8DynamicTableFactory sinkFactory = new
Elasticsearch8DynamicTableFactory();
DynamicTableSink.SinkRuntimeProvider runtimeProvider =
sinkFactory
diff --git
a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
new file mode 100644
index 0000000..46e5c34
--- /dev/null
+++
b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.table;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.mapping.BooleanProperty;
+import co.elastic.clients.elasticsearch._types.mapping.ByteNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DateProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DenseVectorIndexOptions;
+import co.elastic.clients.elasticsearch._types.mapping.DenseVectorProperty;
+import co.elastic.clients.elasticsearch._types.mapping.DoubleNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.FloatNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.IntegerNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.LongNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.Property;
+import co.elastic.clients.elasticsearch._types.mapping.ShortNumberProperty;
+import co.elastic.clients.elasticsearch._types.mapping.TextProperty;
+import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@code VECTOR_SEARCH } ITCase for Elasticsearch 8. */
+@Testcontainers
+public class Elasticsearch8VectorSearchITCase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(Elasticsearch8VectorSearchITCase.class);
+
+ private static final int PARALLELISM = 2;
+
+ public static final String ELASTICSEARCH_VERSION = "8.19.0";
+ public static final DockerImageName ELASTICSEARCH_IMAGE =
+
DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch")
+ .withTag(ELASTICSEARCH_VERSION);
+ private static final String ES_CLUSTER_USERNAME = "elastic";
+ private static final String ES_CLUSTER_PASSWORD = "s3cret";
+
+ @Container
+ private static final ElasticsearchContainer ES_CONTAINER =
createElasticsearchContainer();
+
+ private static ElasticsearchContainer createElasticsearchContainer() {
+ final ElasticsearchContainer container =
+ new ElasticsearchContainer(ELASTICSEARCH_IMAGE)
+ .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+ .withEnv("logger.org.elasticsearch", "ERROR")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ container.withPassword(ES_CLUSTER_PASSWORD);
+
+ container.setWaitStrategy(
+
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(3)));
+
+ return container;
+ }
+
+ private String getEsCertFingerprint() throws Exception {
+ Preconditions.checkArgument(ES_CONTAINER.caCertAsBytes().isPresent());
+ byte[] caCertBytes = ES_CONTAINER.caCertAsBytes().get();
+ X509Certificate caCert =
+ (X509Certificate)
+ CertificateFactory.getInstance("X.509")
+ .generateCertificate(new
ByteArrayInputStream(caCertBytes));
+ byte[] fingerprint =
MessageDigest.getInstance("SHA-256").digest(caCert.getEncoded());
+ return Hex.encodeHexString(fingerprint);
+ }
+
+ private ElasticsearchClient getClient() {
+ final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(ES_CLUSTER_USERNAME,
ES_CLUSTER_PASSWORD));
+ RestClient restClient =
+ RestClient.builder(
+ new HttpHost(
+ ES_CONTAINER.getHost(),
+ ES_CONTAINER.getFirstMappedPort(),
+ "https"))
+ .setHttpClientConfigCallback(
+ httpClientBuilder ->
+ httpClientBuilder
+
.setDefaultCredentialsProvider(credentialsProvider)
+ .setSSLContext(
+
ES_CONTAINER.createSslContextFromCa()))
+ .build();
+ RestClientTransport transport =
+ new RestClientTransport(restClient, new JacksonJsonpMapper());
+ return new ElasticsearchClient(transport);
+ }
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ private final List<Row> inputData =
+ Arrays.asList(
+ Row.of(1L, "Spark", new Float[] {0.2718f, 0.6527f,
0.7076f}),
+ Row.of(2L, "Flink", new Float[] {-0.2718f, -0.6527f,
-0.7076f}));
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ void beforeEach() {
+ tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ }
+
+ @Test
+ public void testSearchFullTypeVectorTable() throws Exception {
+ String index = "table_with_all_supported_types";
+ createFullTypesIndex(index);
+
+ // Insert data using ES client since elasticsearch-8 connector doesn't
support sink
+ Map<String, Object> document = new HashMap<>();
+ document.put("id", 1L);
+ document.put("f1", "ABCDE");
+ document.put("f2", true);
+ document.put("f3", (byte) 127);
+ document.put("f4", (short) 257);
+ document.put("f5", 65535);
+ document.put("f6", LocalDate.ofEpochDay(12345).toString());
+ document.put("f7", "2012-12-12 12:12:12");
+ document.put("f8", 11.11f);
+ document.put("f9", 12.22d);
+ document.put("f10", new float[] {11.11f, 11.12f});
+ document.put("f11", new double[] {12.22d, 12.22d});
+ document.put("f12", new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE});
+ document.put("f13", new long[] {Long.MIN_VALUE, Long.MAX_VALUE});
+
+ IndexResponse response = getClient().index(i ->
i.index(index).id("1").document(document));
+ LOG.info("Indexed document with result: {}", response.result());
+
+ // Wait for es to refresh index
+ getClient().indices().refresh(r -> r.index(index));
+
+ String certFingerprint = getEsCertFingerprint();
+
+ tEnv.executeSql(
+ "CREATE TABLE esTable ("
+ + " id BIGINT,\n"
+ + " f1 STRING,\n"
+ + " f2 BOOLEAN,\n"
+ + " f3 TINYINT,\n"
+ + " f4 SMALLINT,\n"
+ + " f5 INTEGER,\n"
+ + " f6 DATE,\n"
+ + " f7 TIMESTAMP,\n"
+ + " f8 FLOAT,\n"
+ + " f9 DOUBLE,\n"
+ + " f10 ARRAY<FLOAT>,\n"
+ + " f11 ARRAY<DOUBLE>,\n"
+ + " f12 ARRAY<INTEGER>,\n"
+ + " f13 ARRAY<BIGINT>,\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector",
"elasticsearch-8")
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+ "https://" + ES_CONTAINER.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.USERNAME_OPTION.key(),
+ ES_CLUSTER_USERNAME)
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.PASSWORD_OPTION.key(),
+ ES_CLUSTER_PASSWORD)
+ + String.format(
+ "'%s'='%s'\n",
+
Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(),
+ certFingerprint)
+ + ")");
+
+ List<String> rows =
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql(
+ "WITH t(id, vector) AS (SELECT
* FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY<FLOAT>))))\n"
+ + "SELECT * FROM t,
LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n")
+ .collect())
+ .stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ assertThat(rows)
+ .isEqualTo(
+ Collections.singletonList(
+ "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257,
65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22,
12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807],
0.8836806]"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"cosine", "l2_norm", "dot_product"})
+ void testSearchUsingFloatArray(String metric) throws Exception {
+ String index = "table_with_multiple_data_with_" +
metric.toLowerCase().replace("_", "");
+ createSimpleIndex(index, metric);
+
+ // Insert data using ES client since elasticsearch-8 connector doesn't
support sink
+ // For dot_product, vectors must be normalized (unit vectors)
+ indexSimpleDocument(index, "1", 1L, "Batch", new float[] {0.2718f,
0.6527f, 0.7076f});
+ indexSimpleDocument(
+ index, "2", 2L, "Streaming", new float[] {-0.2718f, -0.6527f,
-0.7076f});
+ indexSimpleDocument(index, "3", 3L, "Big Data", new float[] {0.7071f,
0.7071f, 0f});
+
+ // Refresh index to make documents searchable
+ getClient().indices().refresh(r -> r.index(index));
+
+ String certFingerprint = getEsCertFingerprint();
+
+ tEnv.executeSql(
+ "CREATE TABLE es_table("
+ + " id BIGINT,"
+ + " label STRING,"
+ + " vector ARRAY<FLOAT>"
+ + ")\n WITH (\n"
+ + String.format("'%s'='%s',\n", "connector",
"elasticsearch-8")
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.HOSTS_OPTION.key(),
+ "https://" + ES_CONTAINER.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.USERNAME_OPTION.key(),
+ ES_CLUSTER_USERNAME)
+ + String.format(
+ "'%s'='%s',\n",
+
ElasticsearchConnectorOptions.PASSWORD_OPTION.key(),
+ ES_CLUSTER_PASSWORD)
+ + String.format(
+ "'%s'='%s'\n",
+
Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(),
+ certFingerprint)
+ + ")");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE src(\n"
+ + " id BIGINT PRIMARY KEY NOT ENFORCED,\n"
+ + " content STRING,\n"
+ + " index ARRAY<FLOAT>\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '%s'\n"
+ + ");\n",
+ TestValuesTableFactory.registerData(inputData)));
+ assertThat(
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql(
+ "SELECT content, label
FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector),
src.index, 2))")
+ .collect())
+ .stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .isEqualTo(
+ Arrays.asList(
+ "+I[Spark, Batch]",
+ "+I[Spark, Big Data]",
+ "+I[Flink, Streaming]",
+ "+I[Flink, Big Data]"));
+ }
+
+ private void createFullTypesIndex(String index) throws IOException {
+ // In ES 8.x, dense_vector requires index: true and similarity for kNN
search
+ TypeMapping mapping =
+ TypeMapping.of(
+ m ->
+ m.properties(
+ "id",
+ Property.of(
+ p ->
+ p.long_(
+
LongNumberProperty.of(
+
l -> l))))
+ .properties(
+ "f1",
+ Property.of(p ->
p.text(TextProperty.of(t -> t))))
+ .properties(
+ "f2",
+ Property.of(
+ p ->
+ p.boolean_(
+
BooleanProperty.of(
+
b -> b))))
+ .properties(
+ "f3",
+ Property.of(
+ p ->
+ p.byte_(
+
ByteNumberProperty.of(
+
b -> b))))
+ .properties(
+ "f4",
+ Property.of(
+ p ->
+ p.short_(
+
ShortNumberProperty.of(
+
s -> s))))
+ .properties(
+ "f5",
+ Property.of(
+ p ->
+ p.integer(
+
IntegerNumberProperty.of(
+
i -> i))))
+ .properties(
+ "f6",
+ Property.of(p ->
p.date(DateProperty.of(d -> d))))
+ .properties(
+ "f7",
+ Property.of(p ->
p.text(TextProperty.of(t -> t))))
+ .properties(
+ "f8",
+ Property.of(
+ p ->
+ p.float_(
+
FloatNumberProperty.of(
+
f -> f))))
+ .properties(
+ "f9",
+ Property.of(
+ p ->
+ p.double_(
+
DoubleNumberProperty.of(
+
d -> d))))
+ .properties(
+ "f10",
+ Property.of(
+ p ->
+ p.denseVector(
+
createDenseVectorProperty(
+
2, "cosine"))))
+ .properties(
+ "f11",
+ Property.of(
+ p ->
+ p.denseVector(
+
createDenseVectorProperty(
+
2, "cosine"))))
+ .properties(
+ "f12",
+ Property.of(
+ p ->
+ p.denseVector(
+
createDenseVectorProperty(
+
2, "cosine"))))
+ .properties(
+ "f13",
+ Property.of(
+ p ->
+ p.denseVector(
+
createDenseVectorProperty(
+
2, "cosine")))));
+
+ this.getClient().indices().create(c ->
c.index(index).mappings(mapping));
+ }
+
+ private void createSimpleIndex(String index, String similarity) throws
IOException {
+ // In ES 8.x, dense_vector requires index: true and similarity for kNN
search
+ TypeMapping mapping =
+ TypeMapping.of(
+ m ->
+ m.properties(
+ "id",
+ Property.of(
+ p ->
+ p.long_(
+
LongNumberProperty.of(
+
l -> l))))
+ .properties(
+ "label",
+ Property.of(p ->
p.text(TextProperty.of(t -> t))))
+ .properties(
+ "vector",
+ Property.of(
+ p ->
+ p.denseVector(
+
createDenseVectorProperty(
+
3, similarity)))));
+
+ this.getClient().indices().create(c ->
c.index(index).mappings(mapping));
+ }
+
+ private DenseVectorProperty createDenseVectorProperty(int dims, String
similarity) {
+ return DenseVectorProperty.of(
+ d ->
+ d.dims(dims)
+ .index(true)
+ .similarity(similarity)
+ .indexOptions(
+ DenseVectorIndexOptions.of(
+ o ->
o.type("hnsw").m(16).efConstruction(100))));
+ }
+
+ private void indexSimpleDocument(
+ String index, String docId, Long id, String label, float[] vector)
throws IOException {
+ Map<String, Object> document = new HashMap<>();
+ document.put("id", id);
+ document.put("label", label);
+ document.put("vector", vector);
+
+ IndexResponse response =
+ getClient().index(i ->
i.index(index).id(docId).document(document));
+ LOG.info("Indexed document {} with result: {}", docId,
response.result());
+ }
+}
diff --git
a/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties
b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties
new file mode 100644
index 0000000..07514cc
--- /dev/null
+++
b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties
@@ -0,0 +1,17 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+ryuk.container.image = testcontainers/ryuk:0.6.0
diff --git a/pom.xml b/pom.xml
index 98775f6..98dbf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@ under the License.
<properties>
<flink.version>2.2.1</flink.version>
+ <scala.binary.version>2.12</scala.binary.version>
<jackson-bom.version>2.15.3</jackson-bom.version>
<junit4.version>4.13.2</junit4.version>