fsk119 commented on code in PR #137:
URL:
https://github.com/apache/flink-connector-elasticsearch/pull/137#discussion_r2661399398
##########
flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java:
##########
@@ -24,14 +24,14 @@
* from a logical description.
*/
public class ElasticsearchDynamicSource implements LookupTableSource,
SupportsProjectionPushDown {
- private final DecodingFormat<DeserializationSchema<RowData>> format;
- private final ElasticsearchConfiguration config;
+ protected final DecodingFormat<DeserializationSchema<RowData>> format;
+ protected final ElasticsearchConfiguration config;
private final int lookupMaxRetryTimes;
Review Comment:
can we use same retry times for lookup and vector search?
##########
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java:
##########
@@ -0,0 +1,185 @@
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch. */
+public class ElasticsearchRowDataVectorSearchFunction extends
VectorSearchFunction {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(ElasticsearchRowDataVectorSearchFunction.class);
+ private static final long serialVersionUID = 1L;
+ private static final String QUERY_VECTOR = "query_vector";
+
+ private final DeserializationSchema<RowData> deserializationSchema;
+
+ private final String index;
+
+ private final String[] producedNames;
+ private final int maxRetryTimes;
+ private SearchRequest searchRequest;
+ private SearchSourceBuilder searchSourceBuilder;
+
+ private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
+ private final NetworkClientConfig networkClientConfig;
+ private final List<HttpHost> hosts;
+ private final String cosineSimilarity;
+
+ private transient RestHighLevelClient client;
+
+ public ElasticsearchRowDataVectorSearchFunction(
+ DeserializationSchema<RowData> deserializationSchema,
+ int maxRetryTimes,
+ String index,
+ String searchColumn,
+ String[] producedNames,
+ List<HttpHost> hosts,
+ NetworkClientConfig networkClientConfig,
+ ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) {
+
+ checkNotNull(deserializationSchema, "No DeserializationSchema
supplied.");
+ checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
+ checkNotNull(producedNames, "No fieldNames supplied.");
+ checkNotNull(hosts, "No hosts supplied.");
+ checkNotNull(networkClientConfig, "No networkClientConfig supplied.");
+ checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");
+
+ this.deserializationSchema = deserializationSchema;
+ this.maxRetryTimes = maxRetryTimes;
+ this.index = index;
+ this.producedNames = producedNames;
+
+ this.networkClientConfig = networkClientConfig;
+ this.hosts = hosts;
+ this.callBridge = callBridge;
+ this.cosineSimilarity =
Review Comment:
Does es7 only support cosine as the metric?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]