wenjin272 commented on code in PR #137:
URL:
https://github.com/apache/flink-connector-elasticsearch/pull/137#discussion_r3263450095
##########
flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java:
##########
@@ -0,0 +1,189 @@
+package org.apache.flink.connector.elasticsearch.table.search;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.VectorSearchFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The {@link VectorSearchFunction} implementation for Elasticsearch. */
+public class ElasticsearchRowDataVectorSearchFunction extends
VectorSearchFunction {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(ElasticsearchRowDataVectorSearchFunction.class);
+ private static final long serialVersionUID = 1L;
+ private static final String QUERY_VECTOR = "query_vector";
+
+ private final DeserializationSchema<RowData> deserializationSchema;
+
+ private final String index;
+
+ private final String[] producedNames;
+ private final int maxRetryTimes;
+ private final SearchMetric searchMetric;
+ private SearchRequest searchRequest;
+ private SearchSourceBuilder searchSourceBuilder;
+
+ private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
+ private final NetworkClientConfig networkClientConfig;
Review Comment:
ES7 and ES8 use **different client stacks**, so the two configs live on
opposite sides of the `ApiCallBridge` boundary:
| | ES7 | ES8 |
|---|---|---|
| Target client | `RestHighLevelClient` |
`co.elastic.clients.elasticsearch.ElasticsearchClient` |
| Client creation path |
`ElasticsearchApiCallBridge.createClient(NetworkClientConfig, hosts)` |
`NetworkConfig.createEsSyncClient()` / `createEsClient()` |
| Config role | Pure data holder | Data holder **+** client factory |
| Already used by | ES7 lookup function | ES8 sink and lookup |
The two configs carry roughly the same fields, but:
- `NetworkClientConfig` is consumed by `ApiCallBridge`, which only ES7
implements.
- `NetworkConfig` self-builds the ES8 Java API Client and is what the
existing ES8 sink/lookup already depend on.
So this PR follows each module's existing convention rather than
introducing a new split.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]