[
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565300#comment-16565300
]
ASF GitHub Bot commented on FLINK-9885:
---------------------------------------
asfgit closed pull request #6391: [FLINK-9885] [FLINK-8101] Finalize
Elasticsearch 6.x
URL: https://github.com/apache/flink/pull/6391
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/dev/connectors/elasticsearch.md
b/docs/dev/connectors/elasticsearch.md
index 52d1b58bf51..bafe391850b 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -55,6 +55,11 @@ of the Elasticsearch installation:
<td>1.3.0</td>
<td>5.x</td>
</tr>
+ <tr>
+ <td>flink-connector-elasticsearch6{{ site.scala_version_suffix }}</td>
+ <td>1.6.0</td>
+ <td>6 and later versions</td>
+ </tr>
</tbody>
</table>
@@ -71,7 +76,7 @@ creating an `ElasticsearchSink` for requesting document
actions against your clu
## Elasticsearch Sink
-The `ElasticsearchSink` uses a `TransportClient` to communicate with an
+The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or
`RestHighLevelClient` (starting with 6.x) to communicate with an
Elasticsearch cluster.
The example below shows how to configure and create a sink:
@@ -79,6 +84,23 @@ The example below shows how to configure and create a sink:
<div class="codetabs" markdown="1">
<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
@@ -110,6 +132,22 @@ input.addSink(new ElasticsearchSink<>(config,
transportAddresses, new Elasticsea
</div>
<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
@@ -138,8 +176,85 @@ input.addSink(new ElasticsearchSink<>(config,
transportAddresses, new Elasticsea
}
}));{% endhighlight %}
</div>
+<div data-lang="java, Elasticsearch 6.x" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+List<HttpHost> httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+// use a ElasticsearchSink.Builder to create an ElasticsearchSink
+ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
+ httpHosts,
+ new ElasticsearchSinkFunction<String>() {
+ public IndexRequest createIndexRequest(String element) {
+ Map<String, String> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer
indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+ }
+);
+
+// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1);
+
+// provide a RestClientFactory for custom configuration on the internally
created REST client
+builder.setRestClientBuilder(
+ restClientBuilder -> {
+ restClientBuilder.setDefaultHeaders(...)
+ restClientBuilder.setMaxRetryTimeoutMillis(...)
+ restClientBuilder.setPathPrefix(...)
+ restClientBuilder.setHttpClientConfigCallback(...)
+ }
+);
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build());
+{% endhighlight %}
+</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.common.transport.TransportAddress
+
+import java.net.InetAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
@@ -166,6 +281,22 @@ input.addSink(new ElasticsearchSink(config,
transportAddresses, new Elasticsearc
</div>
<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.List
+import java.util.Map
+
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
@@ -190,14 +321,74 @@ input.addSink(new ElasticsearchSink(config,
transportAddresses, new Elasticsearc
}))
{% endhighlight %}
</div>
+<div data-lang="scala, Elasticsearch 6.x" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
+
+import org.apache.http.HttpHost
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import java.util.ArrayList
+import java.util.List
+
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+val esSinkBuilder = new ElasticsearchSink.Builer[String](
+ httpHosts,
+ new ElasticsearchSinkFunction[String] {
+ def createIndexRequest(element: String): IndexRequest = {
+ val json = new java.util.HashMap[String, String]
+ json.put("data", element)
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json)
+ }
+ }
+)
+
+// configuration for the bulk requests; this instructs the sink to emit after
every element, otherwise they would be buffered
+builder.setBulkFlushMaxActions(1)
+
+// provide a RestClientFactory for custom configuration on the internally
created REST client
+builder.setRestClientBuilder(
+ restClientBuilder -> {
+ restClientBuilder.setDefaultHeaders(...)
+ restClientBuilder.setMaxRetryTimeoutMillis(...)
+ restClientBuilder.setPathPrefix(...)
+ restClientBuilder.setHttpClientConfigCallback(...)
+ }
+)
+
+// finally, build and add the sink to the job's pipeline
+input.addSink(esSinkBuilder.build)
+{% endhighlight %}
+</div>
</div>
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+For Elasticsearch versions that still uses the now deprecated
`TransportClient` to communicate
+with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a
`Map` of `String`s
+is used to configure the `ElasticsearchSink`. This config map will be directly
+forwarded when creating the internally used `TransportClient`.
The configuration keys are documented in the Elasticsearch documentation
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
Especially important is the `cluster.name` parameter that must correspond to
the name of your cluster.
+For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used
for cluster communication.
+By default, the connector uses the default configurations for the REST client.
To have custom
+configuration for the REST client, users can provide a `RestClientFactory`
implementation when
+setting up the `ElasticsearchClient.Builder` that builds the sink.
+
Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the `ElasticsearchSinkFunction`
can be used to perform multiple requests of different types (ex.,
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 2ebb97c82e2..33b42cb47f1 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -22,6 +22,9 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,12 +48,32 @@
}
@Override
- public void add(ActionRequest... actionRequests) {
- for (ActionRequest actionRequest : actionRequests) {
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
- this.bulkProcessor.add(actionRequest);
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
}
}
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 2a7a21659e4..f1dcc83f652 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -22,10 +22,10 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
@@ -37,17 +37,28 @@
* <p>Implementations are allowed to be stateful. For example, for
Elasticsearch 1.x, since connecting via an embedded node
* is allowed, the call bridge will hold reference to the created embedded
node. Each instance of the sink will hold
* exactly one instance of the call bridge, and state cleanup is performed
when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
*/
@Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends
Serializable {
/**
- * Creates an Elasticsearch {@link Client}.
+ * Creates an Elasticsearch client implementing {@link AutoCloseable}.
*
* @param clientConfig The configuration to use when constructing the
client.
* @return The created client.
*/
- Client createClient(Map<String, String> clientConfig);
+ C createClient(Map<String, String> clientConfig) throws IOException;
+
+ /**
+ * Creates a {@link BulkProcessor.Builder} for creating the bulk
processor.
+ *
+ * @param client the Elasticsearch client.
+ * @param listener the bulk processor listender.
+ * @return the bulk processor builder.
+ */
+ BulkProcessor.Builder createBulkProcessorBuilder(C client,
BulkProcessor.Listener listener);
/**
* Extracts the cause of failure of a bulk item action.
@@ -71,6 +82,8 @@ void configureBulkProcessorBackoff(
/**
* Perform any necessary state cleanup.
*/
- void cleanup();
+ default void cleanup() {
+ // nothing to cleanup by default
+ }
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9105d9947f2..7dac06ceb8a 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.elasticsearch;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
@@ -62,9 +63,10 @@
* for example, to create a Elasticsearch {@link Client}, handle failed item
responses, etc.
*
* @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link
AutoCloseable}
*/
@Internal
-public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable>
extends RichSinkFunction<T> implements CheckpointedFunction {
private static final long serialVersionUID = -1007596293618451942L;
@@ -85,6 +87,7 @@
/**
* Used to control whether the retry delay should increase
exponentially or remain constant.
*/
+ @PublicEvolving
public enum FlushBackoffType {
CONSTANT,
EXPONENTIAL
@@ -135,14 +138,20 @@ public void setDelayMillis(long delayMillis) {
private final Integer bulkProcessorFlushMaxActions;
private final Integer bulkProcessorFlushMaxSizeMb;
- private final Integer bulkProcessorFlushIntervalMillis;
+ private final Long bulkProcessorFlushIntervalMillis;
private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
//
------------------------------------------------------------------------
// User-facing API and configuration
//
------------------------------------------------------------------------
- /** The user specified config map that we forward to Elasticsearch when
we create the {@link Client}. */
+ /**
+ * The config map that contains configuration for the bulk flushing
behaviours.
+ *
+ * <p>For {@link org.elasticsearch.client.transport.TransportClient}
based implementations, this config
+ * map would also contain Elasticsearch-shipped configuration, and
therefore this config map
+ * would also be forwarded when creating the Elasticsearch client.
+ */
private final Map<String, String> userConfig;
/** The function that is used to construct multiple {@link
ActionRequest ActionRequests} from each incoming element. */
@@ -162,7 +171,7 @@ public void setDelayMillis(long delayMillis) {
//
------------------------------------------------------------------------
/** Call bridge for different version-specific. */
- private final ElasticsearchApiCallBridge callBridge;
+ private final ElasticsearchApiCallBridge<C> callBridge;
/**
* Number of pending action requests not yet acknowledged by
Elasticsearch.
@@ -176,7 +185,7 @@ public void setDelayMillis(long delayMillis) {
private AtomicLong numPendingRequests = new AtomicLong(0);
/** Elasticsearch client created using the call bridge. */
- private transient Client client;
+ private transient C client;
/** Bulk processor to buffer and send requests to Elasticsearch,
created using the client. */
private transient BulkProcessor bulkProcessor;
@@ -237,7 +246,7 @@ public ElasticsearchSinkBase(
}
if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorFlushIntervalMillis =
params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ bulkProcessorFlushIntervalMillis =
params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
} else {
bulkProcessorFlushIntervalMillis = null;
@@ -341,7 +350,7 @@ public void close() throws Exception {
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener
listener) {
checkNotNull(listener);
- BulkProcessor.Builder bulkProcessorBuilder =
BulkProcessor.builder(client, listener);
+ BulkProcessor.Builder bulkProcessorBuilder =
callBridge.createBulkProcessorBuilder(client, listener);
// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 2a1b29736b6..3dc8f879641 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -21,9 +21,12 @@
import org.apache.flink.annotation.PublicEvolving;
import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
/**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link
RequestIndexer} to prepare
* them for sending to an Elasticsearch cluster.
*/
@PublicEvolving
@@ -33,6 +36,41 @@
* Add multiple {@link ActionRequest} to the indexer to prepare for
sending requests to Elasticsearch.
*
* @param actionRequests The multiple {@link ActionRequest} to add.
+ * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or
{@link UpdateRequest}
*/
- void add(ActionRequest... actionRequests);
+ @Deprecated
+ default void add(ActionRequest... actionRequests) {
+ for (ActionRequest actionRequest : actionRequests) {
+ if (actionRequest instanceof IndexRequest) {
+ add((IndexRequest) actionRequest);
+ } else if (actionRequest instanceof DeleteRequest) {
+ add((DeleteRequest) actionRequest);
+ } else if (actionRequest instanceof UpdateRequest) {
+ add((UpdateRequest) actionRequest);
+ } else {
+ throw new
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update
requests");
+ }
+ }
+ }
+
+ /**
+ * Add multiple {@link DeleteRequest} to the indexer to prepare for
sending requests to Elasticsearch.
+ *
+ * @param deleteRequests The multiple {@link DeleteRequest} to add.
+ */
+ void add(DeleteRequest... deleteRequests);
+
+ /**
+ * Add multiple {@link IndexRequest} to the indexer to prepare for
sending requests to Elasticsearch.
+ *
+ * @param indexRequests The multiple {@link IndexRequest} to add.
+ */
+ void add(IndexRequest... indexRequests);
+
+ /**
+ * Add multiple {@link UpdateRequest} to the indexer to prepare for
sending requests to Elasticsearch.
+ *
+ * @param updateRequests The multiple {@link UpdateRequest} to add.
+ */
+ void add(UpdateRequest... updateRequests);
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 09d8806b963..369d26a735a 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -31,6 +31,7 @@
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.junit.Assert;
@@ -92,7 +93,7 @@ public void testItemFailureRethrownOnInvoke() throws
Throwable {
// setup the next bulk request, and its mock item failures
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
Exception("artificial failure for record")));
testHarness.processElement(new StreamRecord<>("msg"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request
sink.manualBulkRequestWithAllPendingRequests();
@@ -124,7 +125,7 @@ public void testItemFailureRethrownOnCheckpoint() throws
Throwable {
// setup the next bulk request, and its mock item failures
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
Exception("artificial failure for record")));
testHarness.processElement(new StreamRecord<>("msg"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request
sink.manualBulkRequestWithAllPendingRequests();
@@ -164,7 +165,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush()
throws Throwable {
sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
testHarness.processElement(new StreamRecord<>("msg-1"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request (1 request only, thus
should succeed)
sink.manualBulkRequestWithAllPendingRequests();
@@ -172,7 +173,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush()
throws Throwable {
// setup the requests to be flushed in the snapshot
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
- verify(sink.getMockBulkProcessor(),
times(3)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(3)).add(any(IndexRequest.class));
CheckedThread snapshotThread = new CheckedThread() {
@Override
@@ -217,7 +218,7 @@ public void testBulkFailureRethrownOnInvoke() throws
Throwable {
// setup the next bulk request, and let the whole bulk request
fail
sink.setFailNextBulkRequestCompletely(new Exception("artificial
failure for bulk request"));
testHarness.processElement(new StreamRecord<>("msg"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request
sink.manualBulkRequestWithAllPendingRequests();
@@ -249,7 +250,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws
Throwable {
// setup the next bulk request, and let the whole bulk request
fail
sink.setFailNextBulkRequestCompletely(new Exception("artificial
failure for bulk request"));
testHarness.processElement(new StreamRecord<>("msg"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request
sink.manualBulkRequestWithAllPendingRequests();
@@ -284,7 +285,7 @@ public void
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
// setup the next bulk request, and let bulk request succeed
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception)
null));
testHarness.processElement(new StreamRecord<>("msg-1"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// manually execute the next bulk request
sink.manualBulkRequestWithAllPendingRequests();
@@ -292,7 +293,7 @@ public void
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
// setup the requests to be flushed in the snapshot
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
- verify(sink.getMockBulkProcessor(),
times(3)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(3)).add(any(IndexRequest.class));
CheckedThread snapshotThread = new CheckedThread() {
@Override
@@ -346,7 +347,7 @@ public void testAtLeastOnceSink() throws Throwable {
// it contains 1 request, which will fail and re-added to the
next bulk request
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
Exception("artificial failure for record")));
testHarness.processElement(new StreamRecord<>("msg"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
CheckedThread snapshotThread = new CheckedThread() {
@Override
@@ -402,7 +403,7 @@ public void
testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
// setup the next bulk request, and let bulk request succeed
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
Exception("artificial failure for record")));
testHarness.processElement(new StreamRecord<>("msg-1"));
- verify(sink.getMockBulkProcessor(),
times(1)).add(any(ActionRequest.class));
+ verify(sink.getMockBulkProcessor(),
times(1)).add(any(IndexRequest.class));
// the snapshot should not block even though we haven't flushed
the bulk request
testHarness.snapshot(1L, 1000L);
@@ -410,7 +411,7 @@ public void
testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
testHarness.close();
}
- private static class DummyElasticsearchSink<T> extends
ElasticsearchSinkBase<T> {
+ private static class DummyElasticsearchSink<T> extends
ElasticsearchSinkBase<T, Client> {
private static final long serialVersionUID =
5051907841570096991L;
@@ -478,11 +479,11 @@ public BulkProcessor getMockBulkProcessor() {
protected BulkProcessor buildBulkProcessor(final
BulkProcessor.Listener listener) {
this.mockBulkProcessor = mock(BulkProcessor.class);
-
when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new
Answer<Object>() {
+
when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new
Answer<Object>() {
@Override
public Object answer(InvocationOnMock
invocationOnMock) throws Throwable {
// intercept the request and add it to
our mock bulk request
-
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class));
return null;
}
@@ -530,7 +531,7 @@ public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
}
}
- private static class DummyElasticsearchApiCallBridge implements
ElasticsearchApiCallBridge {
+ private static class DummyElasticsearchApiCallBridge implements
ElasticsearchApiCallBridge<Client> {
private static final long serialVersionUID =
-4272760730959041699L;
@@ -539,6 +540,11 @@ public Client createClient(Map<String, String>
clientConfig) {
return mock(Client.class);
}
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(Client
client, BulkProcessor.Listener listener) {
+ return null;
+ }
+
@Nullable
@Override
public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
@@ -553,11 +559,6 @@ public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkIt
public void configureBulkProcessorBackoff(BulkProcessor.Builder
builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy
flushBackoffPolicy) {
// no need for this in the test cases here
}
-
- @Override
- public void cleanup() {
- // nothing to cleanup
- }
}
private static class SimpleSinkFunction<String> implements
ElasticsearchSinkFunction<String> {
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index df3779b1fd4..819ffba5d2a 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -26,7 +26,6 @@
import org.apache.flink.util.InstantiationUtil;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -34,19 +33,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Environment preparation and suite of tests for version-specific {@link
ElasticsearchSinkBase} implementations.
+ *
+ * @param <C> Elasticsearch client type
+ * @param <A> The address type to use
*/
-public abstract class ElasticsearchSinkTestBase extends AbstractTestBase {
+public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A>
extends AbstractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
@@ -85,24 +85,21 @@ public static void shutdown() throws Exception {
}
/**
- * Tests that the Elasticsearch sink works properly using a {@link
TransportClient}.
+ * Tests that the Elasticsearch sink works properly.
*/
- public void runTransportClientTest() throws Exception {
- final String index = "transport-client-test-index";
+ public void runElasticsearchSinkTest() throws Exception {
+ final String index = "elasticsearch-sink-test-index";
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
- Map<String, String> userConfig = new HashMap<>();
- // This instructs the sink to emit after every element,
otherwise they would be buffered
-
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", CLUSTER_NAME);
-
source.addSink(createElasticsearchSinkForEmbeddedNode(
- userConfig, new
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+ 1,
+ CLUSTER_NAME,
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
- env.execute("Elasticsearch TransportClient Test");
+ env.execute("Elasticsearch Sink Test");
// verify the results
Client client = embeddedNodeEnv.getClient();
@@ -112,16 +109,20 @@ public void runTransportClientTest() throws Exception {
}
/**
- * Tests that the Elasticsearch sink fails eagerly if the provided list
of transport addresses is {@code null}.
+ * Tests that the Elasticsearch sink fails eagerly if the provided list
of addresses is {@code null}.
*/
- public void runNullTransportClientTest() throws Exception {
+ public void runNullAddressesTest() throws Exception {
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", CLUSTER_NAME);
try {
- createElasticsearchSink(userConfig, null, new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
- } catch (IllegalArgumentException expectedException) {
+ createElasticsearchSink(
+ 1,
+ CLUSTER_NAME,
+ null,
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ } catch (IllegalArgumentException | NullPointerException
expectedException) {
// test passes
return;
}
@@ -130,18 +131,19 @@ public void runNullTransportClientTest() throws Exception
{
}
/**
- * Tests that the Elasticsearch sink fails eagerly if the provided list
of transport addresses is empty.
+ * Tests that the Elasticsearch sink fails eagerly if the provided list
of addresses is empty.
*/
- public void runEmptyTransportClientTest() throws Exception {
+ public void runEmptyAddressesTest() throws Exception {
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", CLUSTER_NAME);
try {
createElasticsearchSink(
- userConfig,
- Collections.<InetSocketAddress>emptyList(),
- new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+ 1,
+ CLUSTER_NAME,
+ Collections.emptyList(),
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
} catch (IllegalArgumentException expectedException) {
// test passes
return;
@@ -153,39 +155,66 @@ public void runEmptyTransportClientTest() throws
Exception {
/**
* Tests whether the Elasticsearch sink fails when there is no cluster
to connect to.
*/
- public void runTransportClientFailsTest() throws Exception {
+ public void runInvalidElasticsearchClusterTest() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> source =
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
Map<String, String> userConfig = new HashMap<>();
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- userConfig.put("cluster.name", "my-transport-client-cluster");
+ userConfig.put("cluster.name", "invalid-cluster-name");
- source.addSink(createElasticsearchSinkForEmbeddedNode(
- Collections.unmodifiableMap(userConfig), new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+ source.addSink(createElasticsearchSinkForNode(
+ 1,
+ "invalid-cluster-name",
+ new
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"),
+ "123.123.123.123")); // incorrect ip address
try {
- env.execute("Elasticsearch Transport Client Test");
+ env.execute("Elasticsearch Sink Test");
} catch (JobExecutionException expectedException) {
-
assertTrue(expectedException.getCause().getMessage().contains("not connected to
any Elasticsearch nodes"));
+ // test passes
return;
}
fail();
}
+ /**
+ * Utility method to create a user config map.
+ */
+ protected Map<String, String> createUserConfig(int bulkFlushMaxActions,
String clusterName) {
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", clusterName);
+
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
String.valueOf(bulkFlushMaxActions));
+
+ return userConfig;
+ }
+
/** Creates a version-specific Elasticsearch sink, using arbitrary
transport addresses. */
- protected abstract <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
-
List<InetSocketAddress> transportAddresses,
-
ElasticsearchSinkFunction<T> elasticsearchSinkFunction);
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C>
createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<A> addresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction);
/**
* Creates a version-specific Elasticsearch sink to connect to a local
embedded Elasticsearch node.
*
- * <p>This case is singled out from {@link
ElasticsearchSinkTestBase#createElasticsearchSink(Map, List,
ElasticsearchSinkFunction)}
+ * <p>This case is singled out from {@link
ElasticsearchSinkTestBase#createElasticsearchSink(int, String, List,
ElasticsearchSinkFunction)}
* because the Elasticsearch Java API to do so is incompatible across
different versions.
*/
- protected abstract <T> ElasticsearchSinkBase<T>
createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) throws Exception;
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C>
createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) throws Exception;
+
+ /**
+ * Creates a version-specific Elasticsearch sink to connect to a
specific Elasticsearch node.
+ */
+ protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C>
createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction,
+ String ipAddress) throws Exception;
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
index ea6e7a3ac70..fd14ba36370 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironment.java
@@ -29,7 +29,7 @@
* also be located under the same package. The intentional
package-private accessibility of this interface
* enforces that.
*/
-interface EmbeddedElasticsearchNodeEnvironment {
+public interface EmbeddedElasticsearchNodeEnvironment {
/**
* Start an embedded Elasticsearch node instance.
diff --git
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 2a3c2a06460..4f1cd086d8f 100644
---
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
*/
@Internal
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge
{
+public class Elasticsearch1ApiCallBridge implements
ElasticsearchApiCallBridge<Client> {
private static final long serialVersionUID = -2632363720584123682L;
@@ -115,6 +115,11 @@ public Client createClient(Map<String, String>
clientConfig) {
}
}
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(Client client,
BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
+ }
+
@Override
public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
if (!bulkItemResponse.isFailed()) {
diff --git
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index e8eccd978f4..d5e1d1fdc12 100644
---
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
@@ -64,7 +65,7 @@
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, Client> {
private static final long serialVersionUID = 1L;
diff --git
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 54892909abc..2f1a65c58ee 100644
---
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -28,10 +28,12 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.junit.Test;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,26 +44,26 @@
/**
* IT Cases for the {@link ElasticsearchSink}.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase<Client,
InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception{
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
// -- Tests specific to Elasticsearch 1.x --
@@ -102,19 +104,28 @@ public void testDeprecatedIndexRequestBuilderVariant()
throws Exception {
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
-
List<InetSocketAddress>
transportAddresses,
-
ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig,
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client>
createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+
ElasticsearchUtils.convertInetSocketAddresses(transportAddresses),
+ elasticsearchSinkFunction);
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client>
createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) throws Exception {
+
+ Map<String, String> userConfig =
createUserConfig(bulkFlushMaxActions, clusterName);
// Elasticsearch 1.x requires this setting when using
// LocalTransportAddress to connect to a local embedded node
- userConfig = new HashMap<>(userConfig);
userConfig.put("node.local", "true");
List<TransportAddress> transports = new ArrayList<>();
@@ -126,6 +137,22 @@ public void testDeprecatedIndexRequestBuilderVariant()
throws Exception {
elasticsearchSinkFunction);
}
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>, Client>
createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
+
+ List<TransportAddress> transports = new ArrayList<>();
+ transports.add(new
InetSocketTransportAddress(InetAddress.getByName(ipAddress), 9300));
+
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
+ }
+
/**
* A {@link IndexRequestBuilder} with equivalent functionality to
{@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
*/
diff --git
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 390a4078e2b..73a69ebde34 100644
---
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -26,7 +26,6 @@
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@@ -44,7 +43,7 @@
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
*/
@Internal
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge
{
+public class Elasticsearch2ApiCallBridge implements
ElasticsearchApiCallBridge<TransportClient> {
private static final long serialVersionUID = 2638252694744361079L;
@@ -63,7 +62,7 @@
}
@Override
- public Client createClient(Map<String, String> clientConfig) {
+ public TransportClient createClient(Map<String, String> clientConfig) {
Settings settings =
Settings.settingsBuilder().put(clientConfig).build();
TransportClient transportClient =
TransportClient.builder().settings(settings).build();
@@ -83,6 +82,11 @@ public Client createClient(Map<String, String> clientConfig)
{
return transportClient;
}
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient
client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
+ }
+
@Override
public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
if (!bulkItemResponse.isFailed()) {
@@ -117,10 +121,4 @@ public void configureBulkProcessorBackoff(
builder.setBackoffPolicy(backoffPolicy);
}
-
- @Override
- public void cleanup() {
- // nothing to cleanup
- }
-
}
diff --git
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index ffccacf40ac..a911905ac0a 100644
---
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -58,7 +58,7 @@
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T,
TransportClient> {
private static final long serialVersionUID = 1L;
diff --git
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index 7ded893be3a..7887e72fa10 100644
---
a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -17,57 +17,81 @@
package org.apache.flink.streaming.connectors.elasticsearch2;
+import org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.elasticsearch.client.transport.TransportClient;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* IT cases for the {@link ElasticsearchSink}.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends
ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception{
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
-
List<InetSocketAddress>
transportAddresses,
-
ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, transportAddresses,
elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transportAddresses,
+ elasticsearchSinkFunction);
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName,
elasticsearchSinkFunction, "127.0.0.1");
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+ transports.add(new
InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
- return new ElasticsearchSink<>(userConfig, transports,
elasticsearchSinkFunction);
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
}
}
diff --git
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 7c4ba7a97f1..a3453ec4445 100644
---
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -26,7 +26,6 @@
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@@ -47,7 +46,7 @@
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
*/
@Internal
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge
{
+public class Elasticsearch5ApiCallBridge implements
ElasticsearchApiCallBridge<TransportClient> {
private static final long serialVersionUID = -5222683870097809633L;
@@ -66,7 +65,7 @@
}
@Override
- public Client createClient(Map<String, String> clientConfig) {
+ public TransportClient createClient(Map<String, String> clientConfig) {
Settings settings = Settings.builder().put(clientConfig)
.put(NetworkModule.HTTP_TYPE_KEY,
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
.put(NetworkModule.TRANSPORT_TYPE_KEY,
Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -89,6 +88,11 @@ public Client createClient(Map<String, String> clientConfig)
{
return transportClient;
}
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(TransportClient
client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client, listener);
+ }
+
@Override
public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
if (!bulkItemResponse.isFailed()) {
@@ -123,10 +127,4 @@ public void configureBulkProcessorBackoff(
builder.setBackoffPolicy(backoffPolicy);
}
-
- @Override
- public void cleanup() {
- // nothing to cleanup
- }
-
}
diff --git
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
index 6c09337227a..b99b3539255 100644
---
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
+++
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSink.java
@@ -59,7 +59,7 @@
* @param <T> Type of the elements handled by this sink
*/
@PublicEvolving
-public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T,
TransportClient> {
private static final long serialVersionUID = 1L;
diff --git
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
index ad7c664cac7..67daa409b7b 100644
---
a/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
+++
b/flink-connectors/flink-connector-elasticsearch5/src/test/java/org/apache/flink/streaming/connectors/elasticsearch5/ElasticsearchSinkITCase.java
@@ -18,58 +18,85 @@
package org.apache.flink.streaming.connectors.elasticsearch5;
+import org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import org.elasticsearch.client.transport.TransportClient;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 5.x CANNOT be executed in the IDE
directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath
for the Elasticsearch embedded
+ * node used in the tests to work properly.
*/
-public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+public class ElasticsearchSinkITCase extends
ElasticsearchSinkTestBase<TransportClient, InetSocketAddress> {
@Test
- public void testTransportClient() throws Exception {
- runTransportClientTest();
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
}
@Test
- public void testNullTransportClient() throws Exception {
- runNullTransportClientTest();
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
}
@Test
- public void testEmptyTransportClient() throws Exception {
- runEmptyTransportClientTest();
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
}
@Test
- public void testTransportClientFails() throws Exception {
- runTransportClientFailsTest();
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSink(Map<String, String> userConfig,
-
List<InetSocketAddress>
transportAddresses,
-
ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) {
- return new ElasticsearchSink<>(userConfig, transportAddresses,
elasticsearchSinkFunction);
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<InetSocketAddress> addresses,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) {
+
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ addresses,
+ elasticsearchSinkFunction);
}
@Override
- protected <T> ElasticsearchSinkBase<T>
createElasticsearchSinkForEmbeddedNode(
- Map<String, String> userConfig, ElasticsearchSinkFunction<T>
elasticsearchSinkFunction) throws Exception {
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName,
elasticsearchSinkFunction, "127.0.0.1");
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
TransportClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
- transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+ transports.add(new
InetSocketAddress(InetAddress.getByName(ipAddress), 9300));
- return new ElasticsearchSink<>(userConfig, transports,
elasticsearchSinkFunction);
+ return new ElasticsearchSink<>(
+
Collections.unmodifiableMap(createUserConfig(bulkFlushMaxActions, clusterName)),
+ transports,
+ elasticsearchSinkFunction);
}
-
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml
b/flink-connectors/flink-connector-elasticsearch6/pom.xml
new file mode 100644
index 00000000000..ef06d80512b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+ <name>flink-connector-elasticsearch6</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <elasticsearch.version>6.3.1</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <!-- Elasticsearch Java Client has been moved
to a different module in 5.x -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Dependency for Elasticsearch 6.x REST Client -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <!--
+ Elasticsearch 5.x uses Log4j2 and no longer detects
logging implementations, making
+ Log4j2 a strict dependency. The following is added so
that the Log4j2 API in
+ Elasticsearch 5.x is routed to SLF4J. This way, user
projects can remain flexible
+ in the logging implementation preferred.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including elasticsearch transport dependency for
tests. Netty3 is not here anymore in 6.x
+ -->
+
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including Log4j2 dependencies for tests is required for
the
+ embedded Elasticsearch nodes used in tests to run
correctly.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>2.9.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.9.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!--
+ For the tests, we need to exclude the Log4j2 to
slf4j adapter dependency
+ and let Elasticsearch directly use Log4j2,
otherwise the embedded Elasticsearch node
+ used in tests will fail to work.
+
+ In other words, the connector jar is routing
Elasticsearch 5.x's Log4j2 API's to SLF4J,
+ but for the test builds, we still stick to
directly using Log4j2.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12.2</version>
+ <configuration>
+ <classpathDependencyExcludes>
+
<classpathDependencyExclude>org.apache.logging.log4j:log4j-to-slf4j</classpathDependencyExclude>
+ </classpathDependencyExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 00000000000..03bf9c07109
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6
and later versions.
+ */
+@Internal
+public class Elasticsearch6ApiCallBridge implements
ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+ private static final long serialVersionUID = -5222683870097809633L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+ /**
+ * User-provided HTTP Host.
+ */
+ private final List<HttpHost> httpHosts;
+
+ /**
+ * The factory to configure the rest client.
+ */
+ private final RestClientFactory restClientFactory;
+
+ Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory
restClientFactory) {
+ Preconditions.checkArgument(httpHosts != null &&
!httpHosts.isEmpty());
+ this.httpHosts = httpHosts;
+ this.restClientFactory =
Preconditions.checkNotNull(restClientFactory);
+ }
+
+ @Override
+ public RestHighLevelClient createClient(Map<String, String>
clientConfig) throws IOException {
+ RestClientBuilder builder =
RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+
+ RestHighLevelClient rhlClient = new
RestHighLevelClient(builder);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pinging Elasticsearch cluster via hosts {}
...", httpHosts);
+ }
+
+ if (!rhlClient.ping()) {
+ throw new RuntimeException("There are no reachable
Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch RestHighLevelClient
connected to {}", httpHosts.toString());
+ }
+
+ return rhlClient;
+ }
+
+ @Override
+ public BulkProcessor.Builder
createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener
listener) {
+ return BulkProcessor.builder(client::bulkAsync, listener);
+ }
+
+ @Override
+ public Throwable
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy
flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy =
BackoffPolicy.constantBackoff(
+ new
TimeValue(flushBackoffPolicy.getDelayMillis()),
+
flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy =
BackoffPolicy.exponentialBackoff(
+ new
TimeValue(flushBackoffPolicy.getDelayMillis()),
+
flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 00000000000..4e7a2635738
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate
with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided
transport addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in
megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data
regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is
used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T,
RestHighLevelClient> {
+
+ private static final long serialVersionUID = 1L;
+
+ private ElasticsearchSink(
+ Map<String, String> bulkRequestsConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ RestClientFactory restClientFactory) {
+
+ super(new Elasticsearch6ApiCallBridge(httpHosts,
restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction,
failureHandler);
+ }
+
+ /**
+ * A builder for creating an {@link ElasticsearchSink}.
+ *
+ * @param <T> Type of the elements handled by the sink this builder
creates.
+ */
+ @PublicEvolving
+ public static class Builder<T> {
+
+ private final List<HttpHost> httpHosts;
+ private final ElasticsearchSinkFunction<T>
elasticsearchSinkFunction;
+
+ private Map<String, String> bulkRequestsConfig = new
HashMap<>();
+ private ActionRequestFailureHandler failureHandler = new
NoOpFailureHandler();
+ private RestClientFactory restClientFactory = restClientBuilder
-> {};
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the
cluster using a {@link RestHighLevelClient}.
+ *
+ * @param httpHosts The list of {@link HttpHost} to which the
{@link RestHighLevelClient} connects to.
+ * @param elasticsearchSinkFunction This is used to generate
multiple {@link ActionRequest} from the incoming element.
+ */
+ public Builder(List<HttpHost> httpHosts,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.elasticsearchSinkFunction =
Preconditions.checkNotNull(elasticsearchSinkFunction);
+ }
+
+ /**
+ * Sets the maximum number of actions to buffer for each bulk
request.
+ *
+ * @param numMaxActions the maxinum number of actions to buffer
per bulk request.
+ */
+ public void setBulkFlushMaxActions(int numMaxActions) {
+ Preconditions.checkArgument(
+ numMaxActions > 0,
+ "Max number of buffered actions must be larger
than 0.");
+
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
String.valueOf(numMaxActions));
+ }
+
+ /**
+ * Sets the maximum size of buffered actions, in mb, per bulk
request.
+ *
+ * @param maxSizeMb the maximum size of buffered actions, in mb.
+ */
+ public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+ Preconditions.checkArgument(
+ maxSizeMb > 0,
+ "Max size of buffered actions must be larger
than 0.");
+
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB,
String.valueOf(maxSizeMb));
+ }
+
+ /**
+ * Sets the bulk flush interval, in milliseconds.
+ *
+ * @param intervalMillis the bulk flush interval, in
milliseconds.
+ */
+ public void setBulkFlushInterval(long intervalMillis) {
+ Preconditions.checkArgument(
+ intervalMillis >= 0,
+ "Interval (in milliseconds) between each flush
must be larger than or equal to 0.");
+
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
String.valueOf(intervalMillis));
+ }
+
+ /**
+ * Sets whether or not to enable bulk flush backoff behaviour.
+ *
+ * @param enabled whether or not to enable backoffs.
+ */
+ public void setBulkFlushBackoff(boolean enabled) {
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE,
String.valueOf(enabled));
+ }
+
+ /**
+ * Sets the type of back of to use when flushing bulk requests.
+ *
+ * @param flushBackoffType the backoff type to use.
+ */
+ public void setBulkFlushBackoffType(FlushBackoffType
flushBackoffType) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+
Preconditions.checkNotNull(flushBackoffType).toString());
+ }
+
+ /**
+ * Sets the maximum number of retries for a backoff attempt
when flushing bulk requests.
+ *
+ * @param maxRetries the maximum number of retries for a
backoff attempt when flushing bulk requests
+ */
+ public void setBulkFlushBackoffRetries(int maxRetries) {
+ Preconditions.checkArgument(
+ maxRetries > 0,
+ "Max number of backoff attempts must be larger
than 0.");
+
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES,
String.valueOf(maxRetries));
+ }
+
+ /**
+ * Sets the amount of delay between each backoff attempt when
flushing bulk requests, in milliseconds.
+ *
+ * @param delayMillis the amount of delay between each backoff
attempt when flushing bulk requests, in milliseconds.
+ */
+ public void setBulkFlushBackoffDelay(long delayMillis) {
+ Preconditions.checkArgument(
+ delayMillis >= 0,
+ "Delay (in milliseconds) between each backoff
attempt must be larger than or equal to 0.");
+
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY,
String.valueOf(delayMillis));
+ }
+
+ /**
+ * Sets a failure handler for action requests.
+ *
+ * @param failureHandler This is used to handle failed {@link
ActionRequest}.
+ */
+ public void setFailureHandler(ActionRequestFailureHandler
failureHandler) {
+ this.failureHandler =
Preconditions.checkNotNull(failureHandler);
+ }
+
+ /**
+ * Sets a REST client factory for custom client configuration.
+ *
+ * @param restClientFactory the factory that configures the
rest client.
+ */
+ public void setRestClientFactory(RestClientFactory
restClientFactory) {
+ this.restClientFactory =
Preconditions.checkNotNull(restClientFactory);
+ }
+
+ /**
+ * Creates the Elasticsearch sink.
+ *
+ * @return the created Elasticsearch sink.
+ */
+ public ElasticsearchSink<T> build() {
+ return new ElasticsearchSink<>(bulkRequestsConfig,
httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
new file mode 100644
index 00000000000..4b74649ca87
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/RestClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.elasticsearch.client.RestClientBuilder;
+
+import java.io.Serializable;
+
+/**
+ * A factory that is used to configure the {@link
org.elasticsearch.client.RestHighLevelClient} internally
+ * used in the {@link ElasticsearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+ /**
+ * Configures the rest client builder.
+ *
+ * @param restClientBuilder the configured rest client builder.
+ */
+ void configureRestClientBuilder(RestClientBuilder restClientBuilder);
+
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 00000000000..8dc62168049
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements
EmbeddedElasticsearchNodeEnvironment {
+
+ private Node node;
+
+ @Override
+ public void start(File tmpDataFolder, String clusterName) throws
Exception {
+ if (node == null) {
+ Settings settings = Settings.builder()
+ .put("cluster.name", clusterName)
+ .put("http.enabled", true)
+ .put("path.home", tmpDataFolder.getParent())
+ .put("path.data",
tmpDataFolder.getAbsolutePath())
+ .build();
+
+ node = new PluginNode(settings);
+ node.start();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (node != null && !node.isClosed()) {
+ node.close();
+ node = null;
+ }
+ }
+
+ @Override
+ public Client getClient() {
+ if (node != null && !node.isClosed()) {
+ return node.client();
+ } else {
+ return null;
+ }
+ }
+
+ private static class PluginNode extends Node {
+ public PluginNode(Settings settings) {
+
super(InternalSettingsPreparer.prepareEnvironment(settings, null),
Collections.<Class<? extends Plugin>>singletonList(Netty4Plugin.class));
+ }
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
new file mode 100644
index 00000000000..a6f01258940
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ *
+ * <p>The Elasticsearch ITCases for 6.x CANNOT be executed in the IDE
directly, since it is required that the
+ * Log4J-to-SLF4J adapter dependency must be excluded from the test classpath
for the Elasticsearch embedded
+ * node used in the tests to work properly.
+ */
+public class ElasticsearchSinkITCase extends
ElasticsearchSinkTestBase<RestHighLevelClient, HttpHost> {
+
+ @Test
+ public void testElasticsearchSink() throws Exception {
+ runElasticsearchSinkTest();
+ }
+
+ @Test
+ public void testNullAddresses() throws Exception {
+ runNullAddressesTest();
+ }
+
+ @Test
+ public void testEmptyAddresses() throws Exception {
+ runEmptyAddressesTest();
+ }
+
+ @Test
+ public void testInvalidElasticsearchCluster() throws Exception{
+ runInvalidElasticsearchClusterTest();
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
RestHighLevelClient> createElasticsearchSink(
+ int bulkFlushMaxActions,
+ String clusterName,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) {
+
+ ElasticsearchSink.Builder<Tuple2<Integer, String>> builder =
new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+ builder.setBulkFlushMaxActions(bulkFlushMaxActions);
+
+ return builder.build();
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
RestHighLevelClient> createElasticsearchSinkForEmbeddedNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction) throws Exception {
+
+ return createElasticsearchSinkForNode(
+ bulkFlushMaxActions, clusterName,
elasticsearchSinkFunction, "127.0.0.1");
+ }
+
+ @Override
+ protected ElasticsearchSinkBase<Tuple2<Integer, String>,
RestHighLevelClient> createElasticsearchSinkForNode(
+ int bulkFlushMaxActions,
+ String clusterName,
+ ElasticsearchSinkFunction<Tuple2<Integer, String>>
elasticsearchSinkFunction,
+ String ipAddress) throws Exception {
+
+ ArrayList<HttpHost> httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost(ipAddress, 9200, "http"));
+
+ ElasticsearchSink.Builder<Tuple2<Integer, String>> builder =
new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);
+ builder.setBulkFlushMaxActions(bulkFlushMaxActions);
+
+ return builder.build();
+ }
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..fcd86546668
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3afb779a80e..cacea91578e 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -50,6 +50,7 @@ under the License.
<module>flink-connector-elasticsearch</module>
<module>flink-connector-elasticsearch2</module>
<module>flink-connector-elasticsearch5</module>
+ <module>flink-connector-elasticsearch6</module>
<module>flink-connector-rabbitmq</module>
<module>flink-connector-twitter</module>
<module>flink-connector-nifi</module>
diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
new file mode 100644
index 00000000000..d170235c702
--- /dev/null
+++ b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <version>1.7-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-elasticsearch6-test</artifactId>
+ <name>flink-elasticsearch6-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<finalName>Elasticsearch6SinkExample</finalName>
+ <artifactSet>
+ <excludes>
+
<exclude>com.google.code.findbugs:jsr305</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>*:*</artifact>
+
<excludes>
+
<exclude>META-INF/*.SF</exclude>
+
<exclude>META-INF/*.DSA</exclude>
+
<exclude>META-INF/*.RSA</exclude>
+
</excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.streaming.tests.Elasticsearch6SinkExample</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
new file mode 100644
index 00000000000..dedcbb28f08
--- /dev/null
+++
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch6Sink.
+ */
+public class Elasticsearch6SinkExample {
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool parameterTool =
ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 3) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: --numRecords <numRecords> --index
<index> --type <type>");
+ return;
+ }
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(5000);
+
+ DataStream<String> source = env.generateSequence(0,
parameterTool.getInt("numRecords") - 1)
+ .map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ List<HttpHost> httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+
+ ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
+ httpHosts,
+ (String element, RuntimeContext ctx, RequestIndexer
indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+
+ // this instructs the sink to emit after every element,
otherwise they would be buffered
+ esSinkBuilder.setBulkFlushMaxActions(1);
+
+ source.addSink(esSinkBuilder.build());
+
+ env.execute("Elasticsearch 6.x end to end sink test example");
+ }
+
+ private static IndexRequest createIndexRequest(String element,
ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index(parameterTool.getRequired("index"))
+ .type(parameterTool.getRequired("type"))
+ .id(element)
+ .source(json);
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 4abf59509df..5752d9afced 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -48,6 +48,7 @@ under the License.
<module>flink-elasticsearch1-test</module>
<module>flink-elasticsearch2-test</module>
<module>flink-elasticsearch5-test</module>
+ <module>flink-elasticsearch6-test</module>
<module>flink-quickstart-test</module>
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh
b/flink-end-to-end-tests/run-nightly-tests.sh
index dc8424f25ec..431c21ee775 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end
test" "$END_TO_END_DIR
run_test "Elasticsearch (v1.7.1) sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"
run_test "Elasticsearch (v2.3.5) sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"
run_test "Elasticsearch (v5.1.2) sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"
+run_test "Elasticsearch (v6.3.1) sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz"
run_test "Quickstarts Java nightly end-to-end test"
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
run_test "Quickstarts Scala nightly end-to-end test"
"$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 7b627fe9de4..fa6c33124be 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -42,15 +42,22 @@ function setup_elasticsearch {
}
function verify_elasticsearch_process_exist {
- local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}')
-
- # make sure the elasticsearch node is actually running
- if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
- echo "Elasticsearch node is not running."
- exit 1
- else
- echo "Elasticsearch node is running."
- fi
+ for ((i=1;i<=10;i++)); do
+ local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print
$2}')
+
+ echo "Waiting for Elasticsearch node to start ..."
+
+ # make sure the elasticsearch node is actually running
+ if [ "$elasticsearchProcess" != "Elasticsearch" ]; then
+ sleep 1
+ else
+ echo "Elasticsearch node is running."
+ return
+ fi
+ done
+
+ echo "Elasticsearch node did not start properly"
+ exit 1
}
function verify_result {
diff --git
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 7464409f41e..c8cd2db17c9 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -32,9 +32,6 @@ start_cluster
function test_cleanup {
shutdown_elasticsearch_cluster index
-
- # make sure to run regular cleanup as well
- cleanup
}
trap test_cleanup INT
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index c4620c837af..ae5e58c2f76 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -88,6 +88,7 @@ flink-connectors/flink-connector-cassandra,\
flink-connectors/flink-connector-elasticsearch,\
flink-connectors/flink-connector-elasticsearch2,\
flink-connectors/flink-connector-elasticsearch5,\
+flink-connectors/flink-connector-elasticsearch6,\
flink-connectors/flink-connector-elasticsearch-base,\
flink-connectors/flink-connector-filesystem,\
flink-connectors/flink-connector-kafka-0.8,\
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> End-to-end test: Elasticsearch 6.x connector
> --------------------------------------------
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
> Issue Type: Sub-task
> Components: ElasticSearch Connector, Tests
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This
> should also come with an end-to-end test that covers this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)