fapaul commented on code in PR #19405:
URL: https://github.com/apache/flink/pull/19405#discussion_r859457944


##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/pom.xml:
##########
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <version>1.16-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-end-to-end-tests-common-elasticsearch</artifactId>
+       <name>Flink : E2E Tests : Elasticsearch Common</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-elasticsearch-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>elasticsearch</artifactId>
+                       <version>${testcontainers.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>compile</scope>
+               </dependency>
+       </dependencies>
+
+       <dependencyManagement>
+          <dependencies>
+                 <dependency>
+                        <groupId>org.apache.httpcomponents</groupId>

Review Comment:
   Can you add a small comment why this is necessary?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchTestEmitter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
+import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test emitter for performing ElasticSearch indexing requests. */
+public interface ElasticsearchTestEmitter extends 
ElasticsearchEmitter<KeyValue<Integer, String>> {
+
+    @Override
+    default void emit(
+            KeyValue<Integer, String> element, SinkWriter.Context context, 
RequestIndexer indexer) {
+        addUpsertRequest(indexer, element);

Review Comment:
   I think it would be good to revisit the current `TestEmitter` structure. You 
can replace the inheritance with composition by only implementing one class 
that has only a constructor parameter to build the `UpdateRequest`. Currently, 
`emit` is marked as default, although all implementors override it, and sharing 
createDoc via a default method also feels strange because it could be a static 
method.



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch7/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7Client.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 7 client. */
+public class Elasticsearch7Client implements ElasticsearchClient {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch7Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 7 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the 
host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch7Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse 
the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName);

Review Comment:
   Same as for the other client, please include the exception in the log 
message.



##########
flink-end-to-end-tests/flink-end-to-end-tests-common-elasticsearch/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkExternalContextBase.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import 
org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The base class for Elasticsearch sink context. */
+abstract class ElasticsearchSinkExternalContextBase
+        implements DataStreamSinkV2ExternalContext<KeyValue<Integer, String>> {
+    /** The constant INDEX_NAME_PREFIX. */
+    protected static final String INDEX_NAME_PREFIX = "es-index";
+
+    private static final int RANDOM_STRING_MAX_LENGTH = 50;
+    private static final int NUM_RECORDS_UPPER_BOUND = 500;
+    private static final int NUM_RECORDS_LOWER_BOUND = 100;
+    protected static final int BULK_BUFFER = 100;
+    protected static final int PAGE_LENGTH = NUM_RECORDS_UPPER_BOUND + 1;
+    /** The index name. */
+    protected final String indexName;
+
+    /** The address reachable from Flink (internal to the testing 
environment). */
+    protected final String addressInternal;
+
+    /** The connector jar paths. */
+    protected final List<URL> connectorJarPaths;
+
+    /** The client. */
+    protected final ElasticsearchClient client;
+
+    /**
+     * Instantiates a new Elasticsearch sink context base.
+     *
+     * @param addressInternal The address to access Elasticsearch from within 
Flink. When running in
+     *     a containerized environment, should correspond to the network alias 
that resolves within
+     *     the environment's network together with the exposed port.
+     * @param connectorJarPaths The connector jar paths.
+     * @param client The Elasticsearch client.
+     */
+    ElasticsearchSinkExternalContextBase(
+            String addressInternal, List<URL> connectorJarPaths, 
ElasticsearchClient client) {
+        this.addressInternal = checkNotNull(addressInternal);
+        this.connectorJarPaths = checkNotNull(connectorJarPaths);
+        this.client = checkNotNull(client);
+        this.indexName =
+                INDEX_NAME_PREFIX + "-" + 
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    }
+
+    @Override
+    public List<KeyValue<Integer, String>> generateTestData(
+            TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        List<KeyValue<Integer, String>> randomStringRecords = new 
ArrayList<>();

Review Comment:
   Unused?



##########
flink-end-to-end-tests/flink-end-to-end-tests-elasticsearch6/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6Client.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The type Elasticsearch 6 client. */
+public class Elasticsearch6Client implements ElasticsearchClient {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch6Client.class);
+
+    private final RestHighLevelClient restClient;
+
+    /**
+     * Instantiates a new Elasticsearch 6 client.
+     *
+     * @param addressExternal The address to access Elasticsearch from the 
host machine (outside of
+     *     the containerized environment).
+     */
+    public Elasticsearch6Client(String addressExternal) {
+        checkNotNull(addressExternal);
+        HttpHost httpHost = HttpHost.create(addressExternal);
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
+        this.restClient = new RestHighLevelClient(restClientBuilder);
+        checkNotNull(restClient);
+    }
+
+    @Override
+    public void deleteIndex(String indexName) {
+        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+        try {
+            restClient.indices().delete(request, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot delete index {}", indexName, e);
+        }
+        // This is needed to avoid race conditions between tests that reuse 
the same index
+        refreshIndex(indexName);
+    }
+
+    @Override
+    public void refreshIndex(String indexName) {
+        RefreshRequest refresh = new RefreshRequest(indexName);
+        
refresh.indicesOptions(IndicesOptions.strictSingleIndexNoExpandForbidClosed());
+        try {
+            restClient.indices().refresh(refresh, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            LOG.error("Cannot refresh index {}", indexName);
+        } catch (ElasticsearchException e) {
+            if (e.status() == RestStatus.NOT_FOUND) {
+                LOG.info("Index {} not found", indexName);
+            }
+        }
+    }
+
+    @Override
+    public void createIndexIfDoesNotExist(String indexName, int shards, int 
replicas) {
+        GetIndexRequest request = new GetIndexRequest(indexName);
+        CreateIndexRequest createIndexRequest = new 
CreateIndexRequest(indexName);
+        createIndexRequest.settings(
+                Settings.builder()
+                        .put("index.number_of_shards", shards)
+                        .put("index.number_of_replicas", replicas));
+        try {
+            boolean exists = restClient.indices().exists(request, 
RequestOptions.DEFAULT);
+            if (!exists) {
+                restClient.indices().create(createIndexRequest, 
RequestOptions.DEFAULT);
+            } else {
+                LOG.info("Index already exists {}", indexName);
+            }
+        } catch (IOException e) {
+            LOG.error("Cannot create index {}", indexName);

Review Comment:
   Please always also include the exception when logging. This affects some 
places in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to