asfgit closed pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x URL: https://github.com/apache/flink/pull/6391
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md index 52d1b58bf51..bafe391850b 100644 --- a/docs/dev/connectors/elasticsearch.md +++ b/docs/dev/connectors/elasticsearch.md @@ -55,6 +55,11 @@ of the Elasticsearch installation: <td>1.3.0</td> <td>5.x</td> </tr> + <tr> + <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td> + <td>1.6.0</td> + <td>6 and later versions</td> + </tr> </tbody> </table> @@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document actions against your clu ## Elasticsearch Sink -The `ElasticsearchSink` uses a `TransportClient` to communicate with an +The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an Elasticsearch cluster. The example below shows how to configure and create a sink: @@ -79,6 +84,23 @@ The example below shows how to configure and create a sink: <div class="codetabs" markdown="1"> <div data-lang="java, Elasticsearch 1.x" markdown="1"> {% highlight java %} +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + DataStream<String> input = ...; Map<String, String> config = new HashMap<>(); @@ -110,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea </div> <div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1"> {% highlight java %} +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + DataStream<String> input = ...; Map<String, String> config = new HashMap<>(); @@ -138,8 +176,85 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea } }));{% endhighlight %} </div> +<div data-lang="java, Elasticsearch 6.x" markdown="1"> +{% highlight java %} +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +DataStream<String> input = ...; + +List<HttpHost> httpHost = new ArrayList<>(); +httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); +httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")); + +// use a ElasticsearchSink.Builder to create an ElasticsearchSink +ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( + httpHosts, + new ElasticsearchSinkFunction<String>() { + public IndexRequest createIndexRequest(String element) { + Map<String, String> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .source(json); + } + + @Override + public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { + indexer.add(createIndexRequest(element)); + } + } +); + +// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered +builder.setBulkFlushMaxActions(1); + +// provide a RestClientFactory for custom configuration on the internally created REST client +builder.setRestClientBuilder( + restClientBuilder -> { + restClientBuilder.setDefaultHeaders(...) + restClientBuilder.setMaxRetryTimeoutMillis(...) + restClientBuilder.setPathPrefix(...) + restClientBuilder.setHttpClientConfigCallback(...) + } +); + +// finally, build and add the sink to the job's pipeline +input.addSink(esSinkBuilder.build()); +{% endhighlight %} +</div> <div data-lang="scala, Elasticsearch 1.x" markdown="1"> {% highlight scala %} +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer + +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.Requests +import org.elasticsearch.common.transport.InetSocketTransportAddress +import org.elasticsearch.common.transport.TransportAddress + +import java.net.InetAddress +import java.util.ArrayList +import java.util.HashMap +import java.util.List +import java.util.Map + val input: DataStream[String] = ... val config = new java.util.HashMap[String, String] @@ -166,6 +281,22 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc </div> <div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1"> {% highlight scala %} +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer +import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink + +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.Requests + +import java.net.InetAddress +import java.net.InetSocketAddress +import java.util.ArrayList +import java.util.HashMap +import java.util.List +import java.util.Map + val input: DataStream[String] = ... val config = new java.util.HashMap[String, String] @@ -190,14 +321,74 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc })) {% endhighlight %} </div> +<div data-lang="scala, Elasticsearch 6.x" markdown="1"> +{% highlight scala %} +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink + +import org.apache.http.HttpHost +import org.elasticsearch.action.index.IndexRequest +import org.elasticsearch.client.Requests + +import java.util.ArrayList +import java.util.List + +val input: DataStream[String] = ... + +val httpHosts = new java.util.ArrayList[HttpHost] +httpHosts.add(new HttpHost("127.0.0.1", 9300, "http")) +httpHosts.add(new HttpHost("10.2.3.1", 9300, "http")) + +val esSinkBuilder = new ElasticsearchSink.Builer[String]( + httpHosts, + new ElasticsearchSinkFunction[String] { + def createIndexRequest(element: String): IndexRequest = { + val json = new java.util.HashMap[String, String] + json.put("data", element) + + return Requests.indexRequest() + .index("my-index") + .type("my-type") + .source(json) + } + } +) + +// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered +builder.setBulkFlushMaxActions(1) + +// provide a RestClientFactory for custom configuration on the internally created REST client +builder.setRestClientBuilder( + restClientBuilder -> { + restClientBuilder.setDefaultHeaders(...) + restClientBuilder.setMaxRetryTimeoutMillis(...) + restClientBuilder.setPathPrefix(...) + restClientBuilder.setHttpClientConfigCallback(...) + } +) + +// finally, build and add the sink to the job's pipeline +input.addSink(esSinkBuilder.build) +{% endhighlight %} +</div> </div> -Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. +For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate +with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s +is used to configure the `ElasticsearchSink`. This config map will be directly +forwarded when creating the internally used `TransportClient`. The configuration keys are documented in the Elasticsearch documentation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). Especially important is the `cluster.name` parameter that must correspond to the name of your cluster. +For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication. +By default, the connector uses the default configurations for the REST client. To have custom +configuration for the REST client, users can provide a `RestClientFactory` implementation when +setting up the `ElasticsearchClient.Builder` that builds the sink. + Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java index 2ebb97c82e2..33b42cb47f1 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java @@ -22,6 +22,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import java.util.concurrent.atomic.AtomicLong; @@ -45,12 +48,32 @@ } @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { if (flushOnCheckpoint) { numPendingRequestsRef.getAndIncrement(); } - this.bulkProcessor.add(actionRequest); + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(indexRequest); + } + } + + @Override + public void add(UpdateRequest... updateRequests) { + for (UpdateRequest updateRequest : updateRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(updateRequest); } } } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java index 2a7a21659e4..f1dcc83f652 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java @@ -22,10 +22,10 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.client.Client; import javax.annotation.Nullable; +import java.io.IOException; import java.io.Serializable; import java.util.Map; @@ -37,17 +37,28 @@ * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since connecting via an embedded node * is allowed, the call bridge will hold reference to the created embedded node. Each instance of the sink will hold * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed. + * + * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}. */ @Internal -public interface ElasticsearchApiCallBridge extends Serializable { +public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable { /** - * Creates an Elasticsearch {@link Client}. + * Creates an Elasticsearch client implementing {@link AutoCloseable}. * * @param clientConfig The configuration to use when constructing the client. * @return The created client. */ - Client createClient(Map<String, String> clientConfig); + C createClient(Map<String, String> clientConfig) throws IOException; + + /** + * Creates a {@link BulkProcessor.Builder} for creating the bulk processor. + * + * @param client the Elasticsearch client. + * @param listener the bulk processor listender. + * @return the bulk processor builder. + */ + BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener); /** * Extracts the cause of failure of a bulk item action. @@ -71,6 +82,8 @@ void configureBulkProcessorBackoff( /** * Perform any necessary state cleanup. */ - void cleanup(); + default void cleanup() { + // nothing to cleanup by default + } } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java index 9105d9947f2..7dac06ceb8a 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.elasticsearch; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; @@ -62,9 +63,10 @@ * for example, to create a Elasticsearch {@link Client}, handle failed item responses, etc. * * @param <T> Type of the elements handled by this sink + * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable} */ @Internal -public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction { +public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction { private static final long serialVersionUID = -1007596293618451942L; @@ -85,6 +87,7 @@ /** * Used to control whether the retry delay should increase exponentially or remain constant. */ + @PublicEvolving public enum FlushBackoffType { CONSTANT, EXPONENTIAL @@ -135,14 +138,20 @@ public void setDelayMillis(long delayMillis) { private final Integer bulkProcessorFlushMaxActions; private final Integer bulkProcessorFlushMaxSizeMb; - private final Integer bulkProcessorFlushIntervalMillis; + private final Long bulkProcessorFlushIntervalMillis; private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy; // ------------------------------------------------------------------------ // User-facing API and configuration // ------------------------------------------------------------------------ - /** The user specified config map that we forward to Elasticsearch when we create the {@link Client}. */ + /** + * The config map that contains configuration for the bulk flushing behaviours. + * + * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this config + * map would also contain Elasticsearch-shipped configuration, and therefore this config map + * would also be forwarded when creating the Elasticsearch client. + */ private final Map<String, String> userConfig; /** The function that is used to construct multiple {@link ActionRequest ActionRequests} from each incoming element. */ @@ -162,7 +171,7 @@ public void setDelayMillis(long delayMillis) { // ------------------------------------------------------------------------ /** Call bridge for different version-specific. */ - private final ElasticsearchApiCallBridge callBridge; + private final ElasticsearchApiCallBridge<C> callBridge; /** * Number of pending action requests not yet acknowledged by Elasticsearch. @@ -176,7 +185,7 @@ public void setDelayMillis(long delayMillis) { private AtomicLong numPendingRequests = new AtomicLong(0); /** Elasticsearch client created using the call bridge. */ - private transient Client client; + private transient C client; /** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */ private transient BulkProcessor bulkProcessor; @@ -237,7 +246,7 @@ public ElasticsearchSinkBase( } if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorFlushIntervalMillis = params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS); + bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS); userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS); } else { bulkProcessorFlushIntervalMillis = null; @@ -341,7 +350,7 @@ public void close() throws Exception { protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { checkNotNull(listener); - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, listener); + BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java index 2a1b29736b6..3dc8f879641 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java @@ -21,9 +21,12 @@ import org.apache.flink.annotation.PublicEvolving; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; /** - * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare + * Users add multiple delete, index or update requests to a {@link RequestIndexer} to prepare * them for sending to an Elasticsearch cluster. */ @PublicEvolving @@ -33,6 +36,41 @@ * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch. * * @param actionRequests The multiple {@link ActionRequest} to add. + * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or {@link UpdateRequest} */ - void add(ActionRequest... actionRequests); + @Deprecated + default void add(ActionRequest... actionRequests) { + for (ActionRequest actionRequest : actionRequests) { + if (actionRequest instanceof IndexRequest) { + add((IndexRequest) actionRequest); + } else if (actionRequest instanceof DeleteRequest) { + add((DeleteRequest) actionRequest); + } else if (actionRequest instanceof UpdateRequest) { + add((UpdateRequest) actionRequest); + } else { + throw new IllegalArgumentException("RequestIndexer only supports Index, Delete and Update requests"); + } + } + } + + /** + * Add multiple {@link DeleteRequest} to the indexer to prepare for sending requests to Elasticsearch. + * + * @param deleteRequests The multiple {@link DeleteRequest} to add. + */ + void add(DeleteRequest... deleteRequests); + + /** + * Add multiple {@link IndexRequest} to the indexer to prepare for sending requests to Elasticsearch. + * + * @param indexRequests The multiple {@link IndexRequest} to add. + */ + void add(IndexRequest... indexRequests); + + /** + * Add multiple {@link UpdateRequest} to the indexer to prepare for sending requests to Elasticsearch. + * + * @param updateRequests The multiple {@link UpdateRequest} to add. + */ + void add(UpdateRequest... updateRequests); } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java index 09d8806b963..369d26a735a 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.junit.Assert; @@ -92,7 +93,7 @@ public void testItemFailureRethrownOnInvoke() throws Throwable { // setup the next bulk request, and its mock item failures sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); testHarness.processElement(new StreamRecord<>("msg")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); @@ -124,7 +125,7 @@ public void testItemFailureRethrownOnCheckpoint() throws Throwable { // setup the next bulk request, and its mock item failures sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); testHarness.processElement(new StreamRecord<>("msg")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); @@ -164,7 +165,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable { sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList); testHarness.processElement(new StreamRecord<>("msg-1")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request (1 request only, thus should succeed) sink.manualBulkRequestWithAllPendingRequests(); @@ -172,7 +173,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable { // setup the requests to be flushed in the snapshot testHarness.processElement(new StreamRecord<>("msg-2")); testHarness.processElement(new StreamRecord<>("msg-3")); - verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class)); CheckedThread snapshotThread = new CheckedThread() { @Override @@ -217,7 +218,7 @@ public void testBulkFailureRethrownOnInvoke() throws Throwable { // setup the next bulk request, and let the whole bulk request fail sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request")); testHarness.processElement(new StreamRecord<>("msg")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); @@ -249,7 +250,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws Throwable { // setup the next bulk request, and let the whole bulk request fail sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request")); testHarness.processElement(new StreamRecord<>("msg")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); @@ -284,7 +285,7 @@ public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable { // setup the next bulk request, and let bulk request succeed sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception) null)); testHarness.processElement(new StreamRecord<>("msg-1")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // manually execute the next bulk request sink.manualBulkRequestWithAllPendingRequests(); @@ -292,7 +293,7 @@ public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable { // setup the requests to be flushed in the snapshot testHarness.processElement(new StreamRecord<>("msg-2")); testHarness.processElement(new StreamRecord<>("msg-3")); - verify(sink.getMockBulkProcessor(), times(3)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(3)).add(any(IndexRequest.class)); CheckedThread snapshotThread = new CheckedThread() { @Override @@ -346,7 +347,7 @@ public void testAtLeastOnceSink() throws Throwable { // it contains 1 request, which will fail and re-added to the next bulk request sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); testHarness.processElement(new StreamRecord<>("msg")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); CheckedThread snapshotThread = new CheckedThread() { @Override @@ -402,7 +403,7 @@ public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti // setup the next bulk request, and let bulk request succeed sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record"))); testHarness.processElement(new StreamRecord<>("msg-1")); - verify(sink.getMockBulkProcessor(), times(1)).add(any(ActionRequest.class)); + verify(sink.getMockBulkProcessor(), times(1)).add(any(IndexRequest.class)); // the snapshot should not block even though we haven't flushed the bulk request testHarness.snapshot(1L, 1000L); @@ -410,7 +411,7 @@ public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti testHarness.close(); } - private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T> { + private static class DummyElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> { private static final long serialVersionUID = 5051907841570096991L; @@ -478,11 +479,11 @@ public BulkProcessor getMockBulkProcessor() { protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) { this.mockBulkProcessor = mock(BulkProcessor.class); - when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new Answer<Object>() { + when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { // intercept the request and add it to our mock bulk request - nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class)); + nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class)); return null; } @@ -530,7 +531,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } } - private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge { + private static class DummyElasticsearchApiCallBridge implements ElasticsearchApiCallBridge<Client> { private static final long serialVersionUID = -4272760730959041699L; @@ -539,6 +540,11 @@ public Client createClient(Map<String, String> clientConfig) { return mock(Client.class); } + @Override + public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) { + return null; + } + @Nullable @Override public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { @@ -553,11 +559,6 @@ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkIt public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { // no need for this in the test cases here } - - @Override - public void cleanup() { - // nothing to cleanup - } } private static class SimpleSinkFunction<String> implements ElasticsearchSinkFunction<String> { diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java index df3779b1fd4..819ffba5d2a 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java @@ -26,7 +26,6 @@ import org.apache.flink.util.InstantiationUtil; import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -34,19 +33,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** * Environment preparation and suite of tests for version-specific {@link ElasticsearchSinkBase} implementations. + * + * @param <C> Elasticsearch client type + * @param <A> The address type to use */ -public abstract class ElasticsearchSinkTestBase extends AbstractTestBase { +public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkTestBase.class); @@ -85,24 +85,21 @@ public static void shutdown() throws Exception { } /** - * Tests that the Elasticsearch sink works properly using a {@link TransportClient}. + * Tests that the Elasticsearch sink works properly. */ - public void runTransportClientTest() throws Exception { - final String index = "transport-client-test-index"; + public void runElasticsearchSinkTest() throws Exception { + final String index = "elasticsearch-sink-test-index"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()); - Map<String, String> userConfig = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - userConfig.put("cluster.name", CLUSTER_NAME); - source.addSink(createElasticsearchSinkForEmbeddedNode( - userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))); + 1, + CLUSTER_NAME, + new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))); - env.execute("Elasticsearch TransportClient Test"); + env.execute("Elasticsearch Sink Test"); // verify the results Client client = embeddedNodeEnv.getClient(); @@ -112,16 +109,20 @@ public void runTransportClientTest() throws Exception { } /** - * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}. + * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is {@code null}. */ - public void runNullTransportClientTest() throws Exception { + public void runNullAddressesTest() throws Exception { Map<String, String> userConfig = new HashMap<>(); userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - userConfig.put("cluster.name", "my-transport-client-cluster"); + userConfig.put("cluster.name", CLUSTER_NAME); try { - createElasticsearchSink(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")); - } catch (IllegalArgumentException expectedException) { + createElasticsearchSink( + 1, + CLUSTER_NAME, + null, + new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")); + } catch (IllegalArgumentException | NullPointerException expectedException) { // test passes return; } @@ -130,18 +131,19 @@ public void runNullTransportClientTest() throws Exception { } /** - * Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty. + * Tests that the Elasticsearch sink fails eagerly if the provided list of addresses is empty. */ - public void runEmptyTransportClientTest() throws Exception { + public void runEmptyAddressesTest() throws Exception { Map<String, String> userConfig = new HashMap<>(); userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - userConfig.put("cluster.name", "my-transport-client-cluster"); + userConfig.put("cluster.name", CLUSTER_NAME); try { createElasticsearchSink( - userConfig, - Collections.<InetSocketAddress>emptyList(), - new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")); + 1, + CLUSTER_NAME, + Collections.emptyList(), + new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")); } catch (IllegalArgumentException expectedException) { // test passes return; @@ -153,39 +155,66 @@ public void runEmptyTransportClientTest() throws Exception { /** * Tests whether the Elasticsearch sink fails when there is no cluster to connect to. */ - public void runTransportClientFailsTest() throws Exception { + public void runInvalidElasticsearchClusterTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()); Map<String, String> userConfig = new HashMap<>(); userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - userConfig.put("cluster.name", "my-transport-client-cluster"); + userConfig.put("cluster.name", "invalid-cluster-name"); - source.addSink(createElasticsearchSinkForEmbeddedNode( - Collections.unmodifiableMap(userConfig), new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"))); + source.addSink(createElasticsearchSinkForNode( + 1, + "invalid-cluster-name", + new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"), + "123.123.123.123")); // incorrect ip address try { - env.execute("Elasticsearch Transport Client Test"); + env.execute("Elasticsearch Sink Test"); } catch (JobExecutionException expectedException) { - assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes")); + // test passes return; } fail(); } + /** + * Utility method to create a user config map. + */ + protected Map<String, String> createUserConfig(int bulkFlushMaxActions, String clusterName) { + Map<String, String> userConfig = new HashMap<>(); + userConfig.put("cluster.name", clusterName); + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(bulkFlushMaxActions)); + + return userConfig; + } + /** Creates a version-specific Elasticsearch sink, using arbitrary transport addresses. */ - protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction); + protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink( + int bulkFlushMaxActions, + String clusterName, + List<A> addresses, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction); /** * Creates a version-specific Elasticsearch sink to connect to a local embedded Elasticsearch node. * - * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(Map, List, ElasticsearchSinkFunction)} + * <p>This case is singled out from {@link ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List, ElasticsearchSinkFunction)} * because the Elasticsearch Java API to do so is incompatible across different versions. */ - protected abstract <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( - Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception; + protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception; + + /** + * Creates a version-specific Elasticsearch sink to connect to a specific Elasticsearch node. + */ + protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, + String ipAddress) throws Exception; } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java index ea6e7a3ac70..fd14ba36370 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java @@ -29,7 +29,7 @@ * also be located under the same package. The intentional package-private accessibility of this interface * enforces that. */ -interface EmbeddedElasticsearchNodeEnvironment { +public interface EmbeddedElasticsearchNodeEnvironment { /** * Start an embedded Elasticsearch node instance. diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java index 2a3c2a06460..4f1cd086d8f 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java @@ -42,7 +42,7 @@ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x. */ @Internal -public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge<Client> { private static final long serialVersionUID = -2632363720584123682L; @@ -115,6 +115,11 @@ public Client createClient(Map<String, String> clientConfig) { } } + @Override + public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) { + return BulkProcessor.builder(client, listener); + } + @Override public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { if (!bulkItemResponse.isFailed()) { diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java index e8eccd978f4..d5e1d1fdc12 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.node.Node; @@ -64,7 +65,7 @@ * @param <T> Type of the elements handled by this sink */ @PublicEvolving -public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> { private static final long serialVersionUID = 1L; diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java index 54892909abc..2f1a65c58ee 100644 --- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java @@ -28,10 +28,12 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.junit.Test; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -42,26 +44,26 @@ /** * IT Cases for the {@link ElasticsearchSink}. */ -public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client, InetSocketAddress> { @Test - public void testTransportClient() throws Exception { - runTransportClientTest(); + public void testElasticsearchSink() throws Exception { + runElasticsearchSinkTest(); } @Test - public void testNullTransportClient() throws Exception { - runNullTransportClientTest(); + public void testNullAddresses() throws Exception { + runNullAddressesTest(); } @Test - public void testEmptyTransportClient() throws Exception { - runEmptyTransportClientTest(); + public void testEmptyAddresses() throws Exception { + runEmptyAddressesTest(); } @Test - public void testTransportClientFails() throws Exception{ - runTransportClientFailsTest(); + public void testInvalidElasticsearchCluster() throws Exception{ + runInvalidElasticsearchClusterTest(); } // -- Tests specific to Elasticsearch 1.x -- @@ -102,19 +104,28 @@ public void testDeprecatedIndexRequestBuilderVariant() throws Exception { } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction); + protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSink( + int bulkFlushMaxActions, + String clusterName, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) { + + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), + elasticsearchSinkFunction); } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( - Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { + protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception { + + Map<String, String> userConfig = createUserConfig(bulkFlushMaxActions, clusterName); // Elasticsearch 1.x requires this setting when using // LocalTransportAddress to connect to a local embedded node - userConfig = new HashMap<>(userConfig); userConfig.put("node.local", "true"); List<TransportAddress> transports = new ArrayList<>(); @@ -126,6 +137,22 @@ public void testDeprecatedIndexRequestBuilderVariant() throws Exception { elasticsearchSinkFunction); } + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client> createElasticsearchSinkForNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, + String ipAddress) throws Exception { + + List<TransportAddress> transports = new ArrayList<>(); + transports.add(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300)); + + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + transports, + elasticsearchSinkFunction); + } + /** * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}. */ diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java index 390a4078e2b..73a69ebde34 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -44,7 +43,7 @@ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x. */ @Internal -public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> { private static final long serialVersionUID = 2638252694744361079L; @@ -63,7 +62,7 @@ } @Override - public Client createClient(Map<String, String> clientConfig) { + public TransportClient createClient(Map<String, String> clientConfig) { Settings settings = Settings.settingsBuilder().put(clientConfig).build(); TransportClient transportClient = TransportClient.builder().settings(settings).build(); @@ -83,6 +82,11 @@ public Client createClient(Map<String, String> clientConfig) { return transportClient; } + @Override + public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) { + return BulkProcessor.builder(client, listener); + } + @Override public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { if (!bulkItemResponse.isFailed()) { @@ -117,10 +121,4 @@ public void configureBulkProcessorBackoff( builder.setBackoffPolicy(backoffPolicy); } - - @Override - public void cleanup() { - // nothing to cleanup - } - } diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java index ffccacf40ac..a911905ac0a 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java @@ -58,7 +58,7 @@ * @param <T> Type of the elements handled by this sink */ @PublicEvolving -public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> { private static final long serialVersionUID = 1L; diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java index 7ded893be3a..7887e72fa10 100644 --- a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java @@ -17,57 +17,81 @@ package org.apache.flink.streaming.connectors.elasticsearch2; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.elasticsearch.client.transport.TransportClient; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map; /** * IT cases for the {@link ElasticsearchSink}. */ -public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> { @Test - public void testTransportClient() throws Exception { - runTransportClientTest(); + public void testElasticsearchSink() throws Exception { + runElasticsearchSinkTest(); } @Test - public void testNullTransportClient() throws Exception { - runNullTransportClientTest(); + public void testNullAddresses() throws Exception { + runNullAddressesTest(); } @Test - public void testEmptyTransportClient() throws Exception { - runEmptyTransportClientTest(); + public void testEmptyAddresses() throws Exception { + runEmptyAddressesTest(); } @Test - public void testTransportClientFails() throws Exception{ - runTransportClientFailsTest(); + public void testInvalidElasticsearchCluster() throws Exception{ + runInvalidElasticsearchClusterTest(); } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction); + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink( + int bulkFlushMaxActions, + String clusterName, + List<InetSocketAddress> transportAddresses, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) { + + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + transportAddresses, + elasticsearchSinkFunction); + } + + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception { + + return createElasticsearchSinkForNode( + bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1"); } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( - Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, + String ipAddress) throws Exception { List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300)); - return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction); + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + transports, + elasticsearchSinkFunction); } } diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java index 7c4ba7a97f1..a3453ec4445 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -47,7 +46,7 @@ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x. */ @Internal -public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge { +public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge<TransportClient> { private static final long serialVersionUID = -5222683870097809633L; @@ -66,7 +65,7 @@ } @Override - public Client createClient(Map<String, String> clientConfig) { + public TransportClient createClient(Map<String, String> clientConfig) { Settings settings = Settings.builder().put(clientConfig) .put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME) .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME) @@ -89,6 +88,11 @@ public Client createClient(Map<String, String> clientConfig) { return transportClient; } + @Override + public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient client, BulkProcessor.Listener listener) { + return BulkProcessor.builder(client, listener); + } + @Override public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { if (!bulkItemResponse.isFailed()) { @@ -123,10 +127,4 @@ public void configureBulkProcessorBackoff( builder.setBackoffPolicy(backoffPolicy); } - - @Override - public void cleanup() { - // nothing to cleanup - } - } diff --git a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java index 6c09337227a..b99b3539255 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java @@ -59,7 +59,7 @@ * @param <T> Type of the elements handled by this sink */ @PublicEvolving -public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> { +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, TransportClient> { private static final long serialVersionUID = 1L; diff --git a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java index ad7c664cac7..67daa409b7b 100644 --- a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java +++ b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java @@ -18,58 +18,85 @@ package org.apache.flink.streaming.connectors.elasticsearch5; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.elasticsearch.client.transport.TransportClient; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map; /** * IT cases for the {@link ElasticsearchSink}. + * + * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE directly, since it is required that the + * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded + * node used in the tests to work properly. */ -public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> { @Test - public void testTransportClient() throws Exception { - runTransportClientTest(); + public void testElasticsearchSink() throws Exception { + runElasticsearchSinkTest(); } @Test - public void testNullTransportClient() throws Exception { - runNullTransportClientTest(); + public void testNullAddresses() throws Exception { + runNullAddressesTest(); } @Test - public void testEmptyTransportClient() throws Exception { - runEmptyTransportClientTest(); + public void testEmptyAddresses() throws Exception { + runEmptyAddressesTest(); } @Test - public void testTransportClientFails() throws Exception { - runTransportClientFailsTest(); + public void testInvalidElasticsearchCluster() throws Exception{ + runInvalidElasticsearchClusterTest(); } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig, - List<InetSocketAddress> transportAddresses, - ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { - return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction); + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSink( + int bulkFlushMaxActions, + String clusterName, + List<InetSocketAddress> addresses, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) { + + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + addresses, + elasticsearchSinkFunction); } @Override - protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode( - Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception { + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception { + + return createElasticsearchSinkForNode( + bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1"); + } + + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, TransportClient> createElasticsearchSinkForNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, + String ipAddress) throws Exception { List<InetSocketAddress> transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); + transports.add(new InetSocketAddress(InetAddress.getByName(ipAddress), 9300)); - return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction); + return new ElasticsearchSink<>( + Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)), + transports, + elasticsearchSinkFunction); } - } diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml new file mode 100644 index 00000000000..ef06d80512b --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml @@ -0,0 +1,180 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.7-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> + <name>flink-connector-elasticsearch6</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <elasticsearch.version>6.3.1</elasticsearch.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- Elasticsearch Java Client has been moved to a different module in 5.x --> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Dependency for Elasticsearch 6.x REST Client --> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + + <!-- + Elasticsearch 5.x uses Log4j2 and no longer detects logging implementations, making + Log4j2 a strict dependency. The following is added so that the Log4j2 API in + Elasticsearch 5.x is routed to SLF4J. This way, user projects can remain flexible + in the logging implementation preferred. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-to-slf4j</artifactId> + <version>2.9.1</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- + Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x + --> + + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>transport-netty4-client</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + + <!-- + Including Log4j2 dependencies for tests is required for the + embedded Elasticsearch nodes used in tests to run correctly. + --> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.9.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.9.1</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- + For the tests, we need to exclude the Log4j2 to slf4j adapter dependency + and let Elasticsearch directly use Log4j2, otherwise the embedded Elasticsearch node + used in tests will fail to work. + + In other words, the connector jar is routing Elasticsearch 5.x's Log4j2 API's to SLF4J, + but for the test builds, we still stick to directly using Log4j2. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.2</version> + <configuration> + <classpathDependencyExcludes> + <classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude> + </classpathDependencyExcludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java new file mode 100644 index 00000000000..03bf9c07109 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.util.Preconditions; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions. + */ +@Internal +public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge<RestHighLevelClient> { + + private static final long serialVersionUID = -5222683870097809633L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class); + + /** + * User-provided HTTP Host. + */ + private final List<HttpHost> httpHosts; + + /** + * The factory to configure the rest client. + */ + private final RestClientFactory restClientFactory; + + Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) { + Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty()); + this.httpHosts = httpHosts; + this.restClientFactory = Preconditions.checkNotNull(restClientFactory); + } + + @Override + public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException { + RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])); + restClientFactory.configureRestClientBuilder(builder); + + RestHighLevelClient rhlClient = new RestHighLevelClient(builder); + + if (LOG.isInfoEnabled()) { + LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts); + } + + if (!rhlClient.ping()) { + throw new RuntimeException("There are no reachable Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString()); + } + + return rhlClient; + } + + @Override + public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { + return BulkProcessor.builder(client::bulkAsync, listener); + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return bulkItemResponse.getFailure().getCause(); + } + } + + @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + + BackoffPolicy backoffPolicy; + if (flushBackoffPolicy != null) { + switch (flushBackoffPolicy.getBackoffType()) { + case CONSTANT: + backoffPolicy = BackoffPolicy.constantBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + break; + case EXPONENTIAL: + default: + backoffPolicy = BackoffPolicy.exponentialBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + } + } else { + backoffPolicy = BackoffPolicy.noBackoff(); + } + + builder.setBackoffPolicy(backoffPolicy); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java new file mode 100644 index 00000000000..4e7a2635738 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; +import org.apache.flink.util.Preconditions; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * + * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * + * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * <ul> + * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * </ul> + * + * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. + * + * @param <T> Type of the elements handled by this sink + */ +@PublicEvolving +public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> { + + private static final long serialVersionUID = 1L; + + private ElasticsearchSink( + Map<String, String> bulkRequestsConfig, + List<HttpHost> httpHosts, + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler, + RestClientFactory restClientFactory) { + + super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler); + } + + /** + * A builder for creating an {@link ElasticsearchSink}. + * + * @param <T> Type of the elements handled by the sink this builder creates. + */ + @PublicEvolving + public static class Builder<T> { + + private final List<HttpHost> httpHosts; + private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction; + + private Map<String, String> bulkRequestsConfig = new HashMap<>(); + private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler(); + private RestClientFactory restClientFactory = restClientBuilder -> {}; + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}. + * + * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} connects to. + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element. + */ + public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + this.httpHosts = Preconditions.checkNotNull(httpHosts); + this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction); + } + + /** + * Sets the maximum number of actions to buffer for each bulk request. + * + * @param numMaxActions the maxinum number of actions to buffer per bulk request. + */ + public void setBulkFlushMaxActions(int numMaxActions) { + Preconditions.checkArgument( + numMaxActions > 0, + "Max number of buffered actions must be larger than 0."); + + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions)); + } + + /** + * Sets the maximum size of buffered actions, in mb, per bulk request. + * + * @param maxSizeMb the maximum size of buffered actions, in mb. + */ + public void setBulkFlushMaxSizeMb(int maxSizeMb) { + Preconditions.checkArgument( + maxSizeMb > 0, + "Max size of buffered actions must be larger than 0."); + + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb)); + } + + /** + * Sets the bulk flush interval, in milliseconds. + * + * @param intervalMillis the bulk flush interval, in milliseconds. + */ + public void setBulkFlushInterval(long intervalMillis) { + Preconditions.checkArgument( + intervalMillis >= 0, + "Interval (in milliseconds) between each flush must be larger than or equal to 0."); + + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis)); + } + + /** + * Sets whether or not to enable bulk flush backoff behaviour. + * + * @param enabled whether or not to enable backoffs. + */ + public void setBulkFlushBackoff(boolean enabled) { + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled)); + } + + /** + * Sets the type of back of to use when flushing bulk requests. + * + * @param flushBackoffType the backoff type to use. + */ + public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) { + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, + Preconditions.checkNotNull(flushBackoffType).toString()); + } + + /** + * Sets the maximum number of retries for a backoff attempt when flushing bulk requests. + * + * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk requests + */ + public void setBulkFlushBackoffRetries(int maxRetries) { + Preconditions.checkArgument( + maxRetries > 0, + "Max number of backoff attempts must be larger than 0."); + + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries)); + } + + /** + * Sets the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds. + * + * @param delayMillis the amount of delay between each backoff attempt when flushing bulk requests, in milliseconds. + */ + public void setBulkFlushBackoffDelay(long delayMillis) { + Preconditions.checkArgument( + delayMillis >= 0, + "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0."); + this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis)); + } + + /** + * Sets a failure handler for action requests. + * + * @param failureHandler This is used to handle failed {@link ActionRequest}. + */ + public void setFailureHandler(ActionRequestFailureHandler failureHandler) { + this.failureHandler = Preconditions.checkNotNull(failureHandler); + } + + /** + * Sets a REST client factory for custom client configuration. + * + * @param restClientFactory the factory that configures the rest client. + */ + public void setRestClientFactory(RestClientFactory restClientFactory) { + this.restClientFactory = Preconditions.checkNotNull(restClientFactory); + } + + /** + * Creates the Elasticsearch sink. + * + * @return the created Elasticsearch sink. + */ + public ElasticsearchSink<T> build() { + return new ElasticsearchSink<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory); + } + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java new file mode 100644 index 00000000000..4b74649ca87 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.annotation.PublicEvolving; + +import org.elasticsearch.client.RestClientBuilder; + +import java.io.Serializable; + +/** + * A factory that is used to configure the {@link org.elasticsearch.client.RestHighLevelClient} internally + * used in the {@link ElasticsearchSink}. + */ +@PublicEvolving +public interface RestClientFactory extends Serializable { + + /** + * Configures the rest client builder. + * + * @param restClientBuilder the configured rest client builder. + */ + void configureRestClientBuilder(RestClientBuilder restClientBuilder); + +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java new file mode 100644 index 00000000000..8dc62168049 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", true) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .build(); + + node = new PluginNode(settings); + node.start(); + } + } + + @Override + public void close() throws Exception { + if (node != null && !node.isClosed()) { + node.close(); + node = null; + } + } + + @Override + public Client getClient() { + if (node != null && !node.isClosed()) { + return node.client(); + } else { + return null; + } + } + + private static class PluginNode extends Node { + public PluginNode(Settings settings) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class)); + } + } + +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java new file mode 100644 index 00000000000..a6f01258940 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * IT cases for the {@link ElasticsearchSink}. + * + * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE directly, since it is required that the + * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath for the Elasticsearch embedded + * node used in the tests to work properly. + */ +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> { + + @Test + public void testElasticsearchSink() throws Exception { + runElasticsearchSinkTest(); + } + + @Test + public void testNullAddresses() throws Exception { + runNullAddressesTest(); + } + + @Test + public void testEmptyAddresses() throws Exception { + runEmptyAddressesTest(); + } + + @Test + public void testInvalidElasticsearchCluster() throws Exception{ + runInvalidElasticsearchClusterTest(); + } + + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSink( + int bulkFlushMaxActions, + String clusterName, + List<HttpHost> httpHosts, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) { + + ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction); + builder.setBulkFlushMaxActions(bulkFlushMaxActions); + + return builder.build(); + } + + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForEmbeddedNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception { + + return createElasticsearchSinkForNode( + bulkFlushMaxActions, clusterName, elasticsearchSinkFunction, "127.0.0.1"); + } + + @Override + protected ElasticsearchSinkBase<Tuple2<Integer, String>, RestHighLevelClient> createElasticsearchSinkForNode( + int bulkFlushMaxActions, + String clusterName, + ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, + String ipAddress) throws Exception { + + ArrayList<HttpHost> httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost(ipAddress, 9200, "http")); + + ElasticsearchSink.Builder<Tuple2<Integer, String>> builder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction); + builder.setBulkFlushMaxActions(bulkFlushMaxActions); + + return builder.build(); + } +} diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..fcd86546668 --- /dev/null +++ b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 3afb779a80e..cacea91578e 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -50,6 +50,7 @@ under the License. <module>flink-connector-elasticsearch</module> <module>flink-connector-elasticsearch2</module> <module>flink-connector-elasticsearch5</module> + <module>flink-connector-elasticsearch6</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> <module>flink-connector-nifi</module> diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml new file mode 100644 index 00000000000..d170235c702 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> + <version>1.7-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-elasticsearch6-test</artifactId> + <name>flink-elasticsearch6-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>Elasticsearch6SinkExample</finalName> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.Elasticsearch6SinkExample</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java new file mode 100644 index 00000000000..dedcbb28f08 --- /dev/null +++ b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * End to end test for Elasticsearch6Sink. + */ +public class Elasticsearch6SinkExample { + + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 3) { + System.out.println("Missing parameters!\n" + + "Usage: --numRecords <numRecords> --index <index> --type <type>"); + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.enableCheckpointing(5000); + + DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction<Long, String>() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); + + List<HttpHost> httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); + + ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>( + httpHosts, + (String element, RuntimeContext ctx, RequestIndexer indexer) -> indexer.add(createIndexRequest(element, parameterTool))); + + // this instructs the sink to emit after every element, otherwise they would be buffered + esSinkBuilder.setBulkFlushMaxActions(1); + + source.addSink(esSinkBuilder.build()); + + env.execute("Elasticsearch 6.x end to end sink test example"); + } + + private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { + Map<String, Object> json = new HashMap<>(); + json.put("data", element); + + return Requests.indexRequest() + .index(parameterTool.getRequired("index")) + .type(parameterTool.getRequired("type")) + .id(element) + .source(json); + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 4abf59509df..5752d9afced 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -48,6 +48,7 @@ under the License. <module>flink-elasticsearch1-test</module> <module>flink-elasticsearch2-test</module> <module>flink-elasticsearch5-test</module> + <module>flink-elasticsearch6-test</module> <module>flink-quickstart-test</module> <module>flink-confluent-schema-registry</module> <module>flink-stream-state-ttl-test</module> diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index dc8424f25ec..431c21ee775 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" +run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz" run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh index 7b627fe9de4..fa6c33124be 100644 --- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh +++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh @@ -42,15 +42,22 @@ function setup_elasticsearch { } function verify_elasticsearch_process_exist { - local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}') - - # make sure the elasticsearch node is actually running - if [ "$elasticsearchProcess" != "Elasticsearch" ]; then - echo "Elasticsearch node is not running." - exit 1 - else - echo "Elasticsearch node is running." - fi + for ((i=1;i<=10;i++)); do + local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}') + + echo "Waiting for Elasticsearch node to start ..." + + # make sure the elasticsearch node is actually running + if [ "$elasticsearchProcess" != "Elasticsearch" ]; then + sleep 1 + else + echo "Elasticsearch node is running." + return + fi + done + + echo "Elasticsearch node did not start properly" + exit 1 } function verify_result { diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh index 7464409f41e..c8cd2db17c9 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh @@ -32,9 +32,6 @@ start_cluster function test_cleanup { shutdown_elasticsearch_cluster index - - # make sure to run regular cleanup as well - cleanup } trap test_cleanup INT diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index c4620c837af..ae5e58c2f76 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\ flink-connectors/flink-connector-elasticsearch,\ flink-connectors/flink-connector-elasticsearch2,\ flink-connectors/flink-connector-elasticsearch5,\ +flink-connectors/flink-connector-elasticsearch6,\ flink-connectors/flink-connector-elasticsearch-base,\ flink-connectors/flink-connector-filesystem,\ flink-connectors/flink-connector-kafka-0.8,\ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services