Repository: flink
Updated Branches:
  refs/heads/release-1.4 257032807 -> f00126459


[FLINK-7943] Make ParameterTool thread safe

This commit changes the serialization of the ParameterTool such that only the
data map is contained. The defaultData and the unrequestedParameters maps are
not serialized because they are only used on the client side. Additionally, the
defaultData and unrequestedParameters map are being made thread safe by using
ConcurrentHashMaps.

This closes #4921.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b4f1b11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b4f1b11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b4f1b11

Branch: refs/heads/release-1.4
Commit: 2b4f1b1168e074be7ec6f57800877cccd1aef28e
Parents: 2570328
Author: Till Rohrmann <[email protected]>
Authored: Mon Oct 30 14:15:20 2017 +0100
Committer: zentol <[email protected]>
Committed: Mon Nov 20 15:51:03 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfigTest.java   |  3 +-
 .../flink/api/java/utils/ParameterTool.java     | 64 +++++++++++++--
 .../api/java/utils/RequiredParameters.java      | 22 +++--
 .../flink/api/java/utils/ParameterToolTest.java | 85 ++++++++++++++++++++
 .../api/java/utils/RequiredParametersTest.java  | 18 +++--
 5 files changed, 167 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b4f1b11/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 7e98604..f9beb40 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -37,7 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class ExecutionConfigTest {
+public class ExecutionConfigTest extends TestLogger {
 
        @Test
        public void testDoubleTypeRegistration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b4f1b11/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 894b66d..e42a4b7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -32,6 +32,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -39,8 +40,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class provides simple utility methods for reading and parsing program 
arguments from different sources.
@@ -212,13 +215,38 @@ public class ParameterTool extends 
ExecutionConfig.GlobalJobParameters implement
 
        // ------------------ ParameterUtil  ------------------------
        protected final Map<String, String> data;
-       protected final Map<String, String> defaultData;
-       protected final Set<String> unrequestedParameters;
+
+       // data which is only used on the client and does not need to be 
transmitted
+       protected transient Map<String, String> defaultData;
+       protected transient Set<String> unrequestedParameters;
 
        private ParameterTool(Map<String, String> data) {
-               this.data = new HashMap<>(data);
-               this.defaultData = new HashMap<>();
-               this.unrequestedParameters = new HashSet<>(data.keySet());
+               this.data = Collections.unmodifiableMap(new HashMap<>(data));
+
+               this.defaultData = new ConcurrentHashMap<>(data.size());
+
+               this.unrequestedParameters = Collections.newSetFromMap(new 
ConcurrentHashMap<>(data.size()));
+
+               unrequestedParameters.addAll(data.keySet());
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               ParameterTool that = (ParameterTool) o;
+               return Objects.equals(data, that.data) &&
+                       Objects.equals(defaultData, that.defaultData) &&
+                       Objects.equals(unrequestedParameters, 
that.unrequestedParameters);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(data, defaultData, unrequestedParameters);
        }
 
        /**
@@ -560,9 +588,21 @@ public class ParameterTool extends 
ExecutionConfig.GlobalJobParameters implement
         * @return The Merged {@link ParameterTool}
         */
        public ParameterTool mergeWith(ParameterTool other) {
-               ParameterTool ret = new ParameterTool(this.data);
-               ret.data.putAll(other.data);
-               ret.unrequestedParameters.addAll(other.unrequestedParameters);
+               Map<String, String> resultData = new HashMap<>(data.size() + 
other.data.size());
+               resultData.putAll(data);
+               resultData.putAll(other.data);
+
+               ParameterTool ret = new ParameterTool(resultData);
+
+               final HashSet<String> requestedParametersLeft = new 
HashSet<>(data.keySet());
+               requestedParametersLeft.removeAll(unrequestedParameters);
+
+               final HashSet<String> requestedParametersRight = new 
HashSet<>(other.data.keySet());
+               requestedParametersRight.removeAll(other.unrequestedParameters);
+
+               ret.unrequestedParameters.removeAll(requestedParametersLeft);
+               ret.unrequestedParameters.removeAll(requestedParametersRight);
+
                return ret;
        }
 
@@ -573,4 +613,12 @@ public class ParameterTool extends 
ExecutionConfig.GlobalJobParameters implement
                return data;
        }
 
+       // ------------------------- Serialization 
---------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               defaultData = Collections.emptyMap();
+               unrequestedParameters = Collections.emptySet();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b4f1b11/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
index 1d2e73a..676a472 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
@@ -83,24 +83,28 @@ public class RequiredParameters {
         * <p>If any check fails, a RequiredParametersException is thrown
         *
         * @param parameterTool - parameters supplied by the user.
+        * @return the updated ParameterTool containing all the required 
parameters
         * @throws RequiredParametersException if any of the specified checks 
fail
         */
-       public void applyTo(ParameterTool parameterTool) throws 
RequiredParametersException {
+       public ParameterTool applyTo(ParameterTool parameterTool) throws 
RequiredParametersException {
                List<String> missingArguments = new LinkedList<>();
+
+               HashMap<String, String> newParameters = new 
HashMap<>(parameterTool.toMap());
+
                for (Option o : data.values()) {
-                       if (parameterTool.data.containsKey(o.getName())) {
-                               if 
(Objects.equals(parameterTool.data.get(o.getName()), 
ParameterTool.NO_VALUE_KEY)) {
+                       if (newParameters.containsKey(o.getName())) {
+                               if 
(Objects.equals(newParameters.get(o.getName()), ParameterTool.NO_VALUE_KEY)) {
                                        // the parameter has been passed, but 
no value, check if there is a default value
-                                       checkAndApplyDefaultValue(o, 
parameterTool.data);
+                                       checkAndApplyDefaultValue(o, 
newParameters);
                                } else {
                                        // a value has been passed in the 
parameterTool, now check if it adheres to all constraints
-                                       checkAmbiguousValues(o, 
parameterTool.data);
-                                       checkIsCastableToDefinedType(o, 
parameterTool.data);
-                                       checkChoices(o, parameterTool.data);
+                                       checkAmbiguousValues(o, newParameters);
+                                       checkIsCastableToDefinedType(o, 
newParameters);
+                                       checkChoices(o, newParameters);
                                }
                        } else {
                                // check if there is a default name or a value 
passed for a possibly defined alternative name.
-                               if 
(hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
+                               if 
(hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, newParameters)) {
                                        missingArguments.add(o.getName());
                                }
                        }
@@ -108,6 +112,8 @@ public class RequiredParameters {
                if (!missingArguments.isEmpty()) {
                        throw new 
RequiredParametersException(this.missingArgumentsText(missingArguments), 
missingArguments);
                }
+
+               return ParameterTool.fromMap(newParameters);
        }
 
        // check if the given parameter has a default value and add it to the 
passed map if that is the case

http://git-wip-us.apache.org/repos/asf/flink/blob/2b4f1b11/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 6ad2022..ccd472b 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -23,16 +23,29 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Tests for {@link ParameterTool}.
@@ -574,6 +587,78 @@ public class ParameterToolTest extends 
AbstractParameterToolTest {
                Assert.assertEquals(Collections.emptySet(), 
parameter.getUnrequestedParameters());
        }
 
+       /**
+        * Tests that we can concurrently serialize and access the 
ParameterTool. See FLINK-7943
+        */
+       @Test
+       public void testConcurrentExecutionConfigSerialization() throws 
ExecutionException, InterruptedException {
+
+               final int numInputs = 10;
+               Collection<String> input = new ArrayList<>(numInputs);
+
+               for (int i = 0; i < numInputs; i++) {
+                       input.add("--" + UUID.randomUUID());
+                       input.add(UUID.randomUUID().toString());
+               }
+
+               final String[] args = input.toArray(new String[0]);
+
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               final int numThreads = 5;
+               final int numSerializations = 100;
+
+               final Collection<CompletableFuture<Void>> futures = new 
ArrayList<>(numSerializations);
+
+               final ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+
+               try {
+                       for (int i = 0; i < numSerializations; i++) {
+                               futures.add(
+                                       CompletableFuture.runAsync(
+                                               () -> {
+                                                       try {
+                                                               
serializeDeserialize(parameterTool);
+                                                       } catch (Exception e) {
+                                                               throw new 
CompletionException(e);
+                                                       }
+                                               },
+                                               executorService));
+                       }
+
+                       for (CompletableFuture<Void> future : futures) {
+                               future.get();
+                       }
+               } finally {
+                       executorService.shutdownNow();
+                       executorService.awaitTermination(1000L, 
TimeUnit.MILLISECONDS);
+               }
+       }
+
+       /**
+        * Accesses parameter tool parameters and then serializes the given 
parameter tool and deserializes again.
+        * @param parameterTool to serialize/deserialize
+        */
+       private void serializeDeserialize(ParameterTool parameterTool) throws 
IOException, ClassNotFoundException {
+               // weirdly enough, this call has side effects making the 
ParameterTool serialization fail if not
+               // using a concurrent data structure.
+               parameterTool.get(UUID.randomUUID().toString());
+
+               try (
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+                       oos.writeObject(parameterTool);
+                       oos.close();
+                       baos.close();
+
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+                       ObjectInputStream ois = new ObjectInputStream(bais);
+
+                       // this should work :-)
+                       ParameterTool deserializedParameterTool = 
((ParameterTool) ois.readObject());
+               }
+       }
+
        private static <T> Set<T> createHashSet(T... elements) {
                Set<T> set = new HashSet<>();
                for (T element : elements) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b4f1b11/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
index 11d1267..e8273ef 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.utils;
 
+import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -33,7 +35,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for RequiredParameter class and its interactions with ParameterTool.
  */
-public class RequiredParametersTest {
+public class RequiredParametersTest extends TestLogger {
 
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
@@ -122,7 +124,7 @@ public class RequiredParametersTest {
 
                try {
                        required.add(new Option("berlin").alt("b"));
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                        Assert.assertEquals(parameter.data.get("b"), "value");
                } catch (RequiredParametersException e) {
@@ -137,7 +139,7 @@ public class RequiredParametersTest {
 
                try {
                        required.add(new 
Option("berlin").alt("b").defaultValue("something"));
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                        Assert.assertEquals(parameter.data.get("b"), "value");
                } catch (RequiredParametersException e) {
@@ -164,7 +166,7 @@ public class RequiredParametersTest {
                RequiredParameters required = new RequiredParameters();
                try {
                        required.add(new Option("berlin"));
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                } catch (RequiredParametersException e) {
                        fail("Exception thrown " + e.getMessage());
@@ -177,7 +179,7 @@ public class RequiredParametersTest {
                RequiredParameters required = new RequiredParameters();
                try {
                        required.add(new 
Option("berlin").defaultValue("value"));
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                } catch (RequiredParametersException e) {
                        fail("Exception thrown " + e.getMessage());
@@ -190,7 +192,7 @@ public class RequiredParametersTest {
                RequiredParameters required = new RequiredParameters();
                try {
                        required.add(new 
Option("berlin").alt("b").defaultValue("value"));
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                        Assert.assertEquals(parameter.data.get("b"), "value");
                } catch (RequiredParametersException e) {
@@ -205,7 +207,7 @@ public class RequiredParametersTest {
                try {
                        rq.add("input");
                        rq.add(new 
Option("parallelism").alt("p").defaultValue("1").type(OptionType.INTEGER));
-                       rq.applyTo(parameter);
+                       parameter = rq.applyTo(parameter);
                        Assert.assertEquals(parameter.data.get("parallelism"), 
"1");
                        Assert.assertEquals(parameter.data.get("p"), "1");
                        Assert.assertEquals(parameter.data.get("input"), "abc");
@@ -223,7 +225,7 @@ public class RequiredParametersTest {
                        required.add(new Option("count").defaultValue("15"));
                        required.add(new 
Option("someFlag").alt("sf").defaultValue("true"));
 
-                       required.applyTo(parameter);
+                       parameter = required.applyTo(parameter);
 
                        Assert.assertEquals(parameter.data.get("berlin"), 
"value");
                        Assert.assertEquals(parameter.data.get("count"), "15");

Reply via email to