This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 640a56f [FLINK-18006] Always overwrite RestClientFactory in
ElasticsearchXDynamicSink
640a56f is described below
commit 640a56fee9d777ff2acb69ab6d77275e7373415d
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Jun 2 17:09:26 2020 +0200
[FLINK-18006] Always overwrite RestClientFactory in
ElasticsearchXDynamicSink
We always overwrite the RestClientFactory in order to workaround an
issue with shading classes in lambdas deserialization method. That way
we never use the default lambda from ElasticsearchSink$Builder which
cannot be deserialized when used from a
flink-sql-connector-elasticsearch module due to shading.
This closes #12455
---
.../connectors/elasticsearch/table/Elasticsearch6DynamicSink.java | 5 +++--
.../connectors/elasticsearch/table/Elasticsearch7DynamicSink.java | 5 +++--
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index bedfbef..680cb2c 100644
---
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -136,8 +136,9 @@ final class Elasticsearch6DynamicSink implements
DynamicTableSink {
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
- config.getPathPrefix()
- .ifPresent(pathPrefix ->
builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+ // we must overwrite the default factory which is
defined with a lambda because of a bug
+ // in shading lambda serialization shading see
FLINK-18006
+ builder.setRestClientFactory(new
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
final ElasticsearchSink<RowData> sink = builder.build();
diff --git
a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 408673e..7aa52ea 100644
---
a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -136,8 +136,9 @@ final class Elasticsearch7DynamicSink implements
DynamicTableSink {
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
- config.getPathPrefix()
- .ifPresent(pathPrefix ->
builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+ // we must overwrite the default factory which is
defined with a lambda because of a bug
+ // in shading lambda serialization shading see
FLINK-18006
+ builder.setRestClientFactory(new
DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
final ElasticsearchSink<RowData> sink = builder.build();