Repository: flink
Updated Branches:
  refs/heads/release-1.4 4219572a8 -> dfa050c01


[FLINK-8489][ES] Prevent side-effects when modifying user-config


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfa050c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfa050c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfa050c0

Branch: refs/heads/release-1.4
Commit: dfa050c01adc559a3ed4df4c2c3273903a37ff79
Parents: 4219572
Author: zentol <ches...@apache.org>
Authored: Mon Jan 29 11:46:08 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Fri Feb 2 15:59:13 2018 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchSinkBase.java    |  4 ++++
 .../ElasticsearchSinkBaseTest.java              | 20 ++++++++++++++++++++
 .../ElasticsearchSinkTestBase.java              |  2 +-
 .../elasticsearch/ElasticsearchSinkITCase.java  |  4 +++-
 4 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfa050c0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index c49d726..08fd125 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -214,6 +215,9 @@ public abstract class ElasticsearchSinkBase<T> extends 
RichSinkFunction<T> imple
 
                checkNotNull(userConfig);
 
+               // copy config so we can remove entries without side-effects
+               userConfig = new HashMap<>(userConfig);
+
                ParameterTool params = ParameterTool.fromMap(userConfig);
 
                if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dfa050c0/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 37e7779..09d8806 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -58,6 +58,26 @@ import static org.mockito.Mockito.when;
  */
 public class ElasticsearchSinkBaseTest {
 
+       /**
+        * Verifies that the collection given to the sink is not modified.
+        */
+       @Test
+       public void testCollectionArgumentNotModified() {
+               Map<String, String> userConfig = new HashMap<>();
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, "1");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, 
"true");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, 
"1");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, 
"CONSTANT");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+               
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, "1");
+
+               new DummyElasticsearchSink<>(
+                       Collections.unmodifiableMap(userConfig),
+                       new SimpleSinkFunction<String>(),
+                       new NoOpFailureHandler());
+       }
+
        /** Tests that any item failure in the listener callbacks is rethrown 
on an immediately following invoke call. */
        @Test
        public void testItemFailureRethrownOnInvoke() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/dfa050c0/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 297bc5d..f725f88 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -159,7 +159,7 @@ public abstract class ElasticsearchSinkTestBase extends 
StreamingMultipleProgram
                userConfig.put("cluster.name", "my-transport-client-cluster");
 
                source.addSink(createElasticsearchSinkForEmbeddedNode(
-                       userConfig, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
+                       Collections.unmodifiableMap(userConfig), new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));
 
                try {
                        env.execute("Elasticsearch Transport Client Test");

http://git-wip-us.apache.org/repos/asf/flink/blob/dfa050c0/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 3772f02..5489290 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -113,13 +114,14 @@ public class ElasticsearchSinkITCase extends 
ElasticsearchSinkTestBase {
 
                // Elasticsearch 1.x requires this setting when using
                // LocalTransportAddress to connect to a local embedded node
+               userConfig = new HashMap<>(userConfig);
                userConfig.put("node.local", "true");
 
                List<TransportAddress> transports = new ArrayList<>();
                transports.add(new LocalTransportAddress("1"));
 
                return new ElasticsearchSink<>(
-                       userConfig,
+                       Collections.unmodifiableMap(userConfig),
                        transports,
                        elasticsearchSinkFunction);
        }

Reply via email to