[
https://issues.apache.org/jira/browse/FLINK-10269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613333#comment-16613333
]
ASF GitHub Bot commented on FLINK-10269:
----------------------------------------
asfgit closed pull request #6682: [FLINK-10269] [connectors] Fix Elasticsearch
6 UpdateRequest binary incompatibility
URL: https://github.com/apache/flink/pull/6682
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index f1dcc83f652..d3b774c8428 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* An {@link ElasticsearchApiCallBridge} is used to bridge incompatible
Elasticsearch Java API calls across different versions.
@@ -79,6 +80,19 @@ void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy
flushBackoffPolicy);
+ /**
+ * Creates a {@link RequestIndexer} that is able to work with {@link
BulkProcessor} binary compatible.
+ */
+ default RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new PreElasticsearch6BulkProcessorIndexer(
+ bulkProcessor,
+ flushOnCheckpoint,
+ numPendingRequestsRef);
+ }
+
/**
* Perform any necessary state cleanup.
*/
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 7dac06ceb8a..4d0c00252d2 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -164,7 +164,7 @@ public void setDelayMillis(long delayMillis) {
private boolean flushOnCheckpoint = true;
/** Provided to the user via the {@link ElasticsearchSinkFunction} to
add {@link ActionRequest ActionRequests}. */
- private transient BulkProcessorIndexer requestIndexer;
+ private transient RequestIndexer requestIndexer;
//
------------------------------------------------------------------------
// Internals for the Flink Elasticsearch Sink
@@ -295,7 +295,7 @@ public void disableFlushOnCheckpoint() {
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
- requestIndexer = new BulkProcessorIndexer(bulkProcessor,
flushOnCheckpoint, numPendingRequests);
+ requestIndexer =
callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint,
numPendingRequests);
}
@Override
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 00000000000..85f4b9a3ea1
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk
request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+
versions
+ * (i.e. the {@link #add(UpdateRequest...)} ). However, this
module is currently
+ * compiled against a very old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ PreElasticsearch6BulkProcessorIndexer(BulkProcessor bulkProcessor,
boolean flushOnCheckpoint, AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef =
checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
index 03bf9c07109..782cbbcf467 100644
---
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.util.Preconditions;
import org.apache.http.HttpHost;
@@ -38,6 +39,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6
and later versions.
@@ -126,4 +128,15 @@ public void configureBulkProcessorBackoff(
builder.setBackoffPolicy(backoffPolicy);
}
+
+ @Override
+ public RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new Elasticsearch6BulkProcessorIndexer(
+ bulkProcessor,
+ flushOnCheckpoint,
+ numPendingRequestsRef);
+ }
}
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 00000000000..af3c5b13a9a
--- /dev/null
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be buffered before sending a bulk
request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef =
checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
index 18fa05a8976..21c53edcf4f 100644
---
a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
+++
b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java
@@ -17,16 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@@ -56,11 +58,14 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0,
parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source =
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String,
String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message # " + value;
+ public void flatMap(Long value,
Collector<Tuple2<String, String>> out) {
+ final String key =
String.valueOf(value);
+ final String message = "message #" +
value;
+ out.collect(Tuple2.of(key, message +
"update #1"));
+ out.collect(Tuple2.of(key, message +
"update #2"));
}
});
@@ -72,12 +77,13 @@ public String map(Long value) throws Exception {
List<TransportAddress> transports = new ArrayList<>();
transports.add(new
InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports,
new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
- indexer.add(createIndexRequest(element,
parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx,
RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1,
parameterTool));
+ indexer.add(createUpdateRequest(element,
parameterTool));
+ }));
env.execute("Elasticsearch1.x end to end sink test example");
}
@@ -92,4 +98,16 @@ private static IndexRequest createIndexRequest(String
element, ParameterTool par
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String>
element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
index f7532b1a8d6..f8f390e9747 100644
---
a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
+++
b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java
@@ -17,15 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.net.InetAddress;
@@ -54,11 +57,14 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0,
parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source =
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String,
String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value,
Collector<Tuple2<String, String>> out) {
+ final String key =
String.valueOf(value);
+ final String message = "message #" +
value;
+ out.collect(Tuple2.of(key, message +
"update #1"));
+ out.collect(Tuple2.of(key, message +
"update #2"));
}
});
@@ -70,12 +76,13 @@ public String map(Long value) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports,
new ElasticsearchSinkFunction<String>(){
- @Override
- public void process(String element, RuntimeContext ctx,
org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer indexer) {
- indexer.add(createIndexRequest(element,
parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx,
RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1,
parameterTool));
+ indexer.add(createUpdateRequest(element,
parameterTool));
+ }));
env.execute("Elasticsearch2.x end to end sink test example");
}
@@ -90,4 +97,16 @@ private static IndexRequest createIndexRequest(String
element, ParameterTool par
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String>
element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
index 39808f6fd4d..893d3662936 100644
---
a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
+++
b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java
@@ -17,16 +17,18 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.net.InetAddress;
@@ -55,11 +57,14 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0,
parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source =
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String,
String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value,
Collector<Tuple2<String, String>> out) {
+ final String key =
String.valueOf(value);
+ final String message = "message #" +
value;
+ out.collect(Tuple2.of(key, message +
"update #1"));
+ out.collect(Tuple2.of(key, message +
"update #2"));
}
});
@@ -71,12 +76,13 @@ public String map(Long value) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
- source.addSink(new ElasticsearchSink<>(userConfig, transports,
new ElasticsearchSinkFunction<String>() {
- @Override
- public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
- indexer.add(createIndexRequest(element,
parameterTool));
- }
- }));
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ (Tuple2<String, String> element, RuntimeContext ctx,
RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1,
parameterTool));
+ indexer.add(createUpdateRequest(element,
parameterTool));
+ }));
env.execute("Elasticsearch5.x end to end sink test example");
}
@@ -91,4 +97,16 @@ private static IndexRequest createIndexRequest(String
element, ParameterTool par
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String>
element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
index dedcbb28f08..e813c2995f5 100644
---
a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
+++
b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java
@@ -17,16 +17,19 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
@@ -53,20 +56,26 @@ public static void main(String[] args) throws Exception {
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);
- DataStream<String> source = env.generateSequence(0,
parameterTool.getInt("numRecords") - 1)
- .map(new MapFunction<Long, String>() {
+ DataStream<Tuple2<String, String>> source =
env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .flatMap(new FlatMapFunction<Long, Tuple2<String,
String>>() {
@Override
- public String map(Long value) throws Exception {
- return "message #" + value;
+ public void flatMap(Long value,
Collector<Tuple2<String, String>> out) {
+ final String key =
String.valueOf(value);
+ final String message = "message #" +
value;
+ out.collect(Tuple2.of(key, message +
"update #1"));
+ out.collect(Tuple2.of(key, message +
"update #2"));
}
});
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
- ElasticsearchSink.Builder<String> esSinkBuilder = new
ElasticsearchSink.Builder<>(
+ ElasticsearchSink.Builder<Tuple2<String, String>> esSinkBuilder
= new ElasticsearchSink.Builder<>(
httpHosts,
- (String element, RuntimeContext ctx, RequestIndexer
indexer) -> indexer.add(createIndexRequest(element, parameterTool)));
+ (Tuple2<String, String> element, RuntimeContext ctx,
RequestIndexer indexer) -> {
+ indexer.add(createIndexRequest(element.f1,
parameterTool));
+ indexer.add(createUpdateRequest(element,
parameterTool));
+ });
// this instructs the sink to emit after every element,
otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
@@ -86,4 +95,16 @@ private static IndexRequest createIndexRequest(String
element, ParameterTool par
.id(element)
.source(json);
}
+
+ private static UpdateRequest createUpdateRequest(Tuple2<String, String>
element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element.f1);
+
+ return new UpdateRequest(
+ parameterTool.getRequired("index"),
+ parameterTool.getRequired("type"),
+ element.f0)
+ .doc(json)
+ .upsert(json);
+ }
}
diff --git
a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index c8cd2db17c9..800c4e20ae0 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -45,4 +45,5 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
--index index \
--type type
-verify_result 20 index
+# 40 index requests and 20 final update requests
+verify_result 60 index
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Elasticsearch 6 UpdateRequest fail because of binary incompatibility
> --------------------------------------------------------------------
>
> Key: FLINK-10269
> URL: https://issues.apache.org/jira/browse/FLINK-10269
> Project: Flink
> Issue Type: Bug
> Components: ElasticSearch Connector
> Affects Versions: 1.6.0
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.6.1
>
>
> When trying to send UpdateRequest(s) to ElasticSearch6, and one gets the
> following
> error:
> {code}
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> at
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)
> {code}
> ElasticsearchSinkFunction:
> {code}
> import org.elasticsearch.action.update.UpdateRequest
> def upsertRequest(element: T): UpdateRequest = {
> new UpdateRequest(
> "myIndex",
> "record",
> s"${element.id}")
> .doc(element.toMap())
> }
> override def process(element: T, runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit = {
> requestIndexer.add(upsertRequest(element))
> }
> {code}
> This is due to a binary compatibility issue between the base module (which is
> compiled against a very old ES version and the current Elasticsearch version).
> As a work around you can simply copy
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer to
> your project. This should ensure that the class is compiled correctly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)