This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new e3beef3 [FLINK-20018] Allow escaping in 'pipeline.cached-files' and
'pipeline.default-kryo-serializers'
e3beef3 is described below
commit e3beef341bbe72d040a2fdabfc30350da56017af
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Nov 6 11:07:31 2020 +0100
[FLINK-20018] Allow escaping in 'pipeline.cached-files' and
'pipeline.default-kryo-serializers'
This commit enables escaping in options that expect a map of
string-string entries. It lets users pass options such as e.g.
pipeline.cached-files=name:file1,path:'oss://bucket/file1'
---
.../apache/flink/api/common/ExecutionConfig.java | 12 +---
.../flink/api/common/cache/DistributedCache.java | 11 +---
.../flink/configuration/ConfigurationUtils.java | 24 ++++++++
.../configuration/StructuredOptionsSplitter.java | 2 -
.../StructuredOptionsSplitterTest.java | 5 +-
...ecutionEnvironmentComplexConfigurationTest.java | 66 +++++++++++++++++++++-
6 files changed, 93 insertions(+), 27 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 73ff232..8526997 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MetricOptions;
@@ -33,7 +34,6 @@ import org.apache.flink.util.Preconditions;
import com.esotericsoftware.kryo.Serializer;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -1234,15 +1234,7 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
ClassLoader classLoader,
List<String> kryoSerializers) {
return kryoSerializers.stream()
- .map(v -> Arrays.stream(v.split(","))
- .map(p -> p.split(":"))
- .collect(
- Collectors.toMap(
- arr -> arr[0], // entry key
- arr -> arr[1] // entry value
- )
- )
- )
+ .map(ConfigurationUtils::parseMap)
.collect(Collectors.toMap(
m -> loadClass(m.get("class"), classLoader,
"Could not load class for kryo serialization"),
m -> loadClass(m.get("serializer"),
classLoader, "Could not load serializer's class"),
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 079cfab..8b58b97 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.cache;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.fs.Path;
import java.io.File;
@@ -197,15 +198,7 @@ public class DistributedCache {
*/
public static List<Tuple2<String, DistributedCacheEntry>>
parseCachedFilesFromString(List<String> files) {
return files.stream()
- .map(v -> Arrays.stream(v.split(","))
- .map(p -> p.split(":"))
- .collect(
- Collectors.toMap(
- arr -> arr[0], // key name
- arr -> arr[1] // value
- )
- )
- )
+ .map(ConfigurationUtils::parseMap)
.map(m -> Tuple2.of(
m.get("name"),
new DistributedCacheEntry(
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 745d474..f7987b6 100755
---
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
import static
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
@@ -98,6 +99,29 @@ public class ConfigurationUtils {
return splitPaths(configValue);
}
+ /**
+ * Parses a string as a map of strings. The expected format of the map
is:
+ * <pre>
+ * key1:value1,key2:value2
+ * </pre>
+ *
+ * <p>Parts of the string can be escaped by wrapping with single or
double quotes.
+ *
+ * @param stringSerializedMap a string to parse
+ *
+ * @return parsed map
+ */
+ public static Map<String, String> parseMap(String stringSerializedMap) {
+ return
StructuredOptionsSplitter.splitEscaped(stringSerializedMap, ',').stream()
+ .map(p -> StructuredOptionsSplitter.splitEscaped(p,
':'))
+ .collect(
+ Collectors.toMap(
+ arr -> arr.get(0), // key name
+ arr -> arr.get(1) // value
+ )
+ );
+ }
+
public static Time getStandaloneClusterStartupPeriodTime(Configuration
configuration) {
final Time timeout;
long standaloneClusterStartupPeriodTime =
configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
index 8be69c6..a237969 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/StructuredOptionsSplitter.java
@@ -161,8 +161,6 @@ class StructuredOptionsSplitter {
char c = string.charAt(i);
if (c == delimiter) {
return i;
- } else if (c == '\'' || c == '"') {
- throw new IllegalArgumentException("Could not
split string. Illegal quoting at position: " + i);
}
builder.append(c);
diff --git
a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
index 323e55d..ae73d6a 100644
---
a/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
+++
b/flink-core/src/test/java/org/apache/flink/configuration/StructuredOptionsSplitterTest.java
@@ -53,7 +53,7 @@ public class StructuredOptionsSplitterTest {
TestSpec.split("'A;B';'C'", ';').expect("A;B", "C"),
TestSpec.split("A;B;C", ';').expect("A", "B", "C"),
TestSpec.split("'AB''D;B';C", ';').expect("AB'D;B",
"C"),
- TestSpec.split("A'BD;B';C", ';').expectException("Could
not split string. Illegal quoting at position: 1"),
+ TestSpec.split("A'BD;B';C", ';').expect("A'BD", "B'",
"C"),
TestSpec.split("'AB'D;B;C", ';').expectException("Could
not split string. Illegal quoting at position: 3"),
TestSpec.split("'A", ';').expectException("Could not
split string. Quoting was not closed properly."),
TestSpec.split("C;'", ';').expectException("Could not
split string. Quoting was not closed properly."),
@@ -63,8 +63,7 @@ public class StructuredOptionsSplitterTest {
TestSpec.split("\"A;B\";\"C\"", ';').expect("A;B", "C"),
TestSpec.split("A;B;C", ';').expect("A", "B", "C"),
TestSpec.split("\"AB\"\"D;B\";C",
';').expect("AB\"D;B", "C"),
- TestSpec.split("A\"BD;B\";C", ';')
- .expectException("Could not split string.
Illegal quoting at position: 1"),
+ TestSpec.split("A\"BD;B\";C", ';').expect("A\"BD",
"B\"", "C"),
TestSpec.split("\"AB\"D;B;C", ';')
.expectException("Could not split string.
Illegal quoting at position: 3"),
TestSpec.split("\"A", ';').expectException("Could not
split string. Quoting was not closed properly."),
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
index 67f89e9..b4352e6 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
@@ -25,9 +25,14 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.junit.Test;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -57,21 +62,50 @@ public class
StreamExecutionEnvironmentComplexConfigurationTest {
@Test
public void testLoadingCachedFilesFromConfiguration() {
StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment();
- envFromConfiguration.registerCachedFile("/tmp3", "file3", true);
+ envFromConfiguration.registerCachedFile("/tmp4", "file4", true);
Configuration configuration = new Configuration();
- configuration.setString("pipeline.cached-files",
"name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2");
+ configuration.setString(
+ "pipeline.cached-files",
+ "name:file1,path:/tmp1,executable:true;"
+ + "name:file2,path:/tmp2;"
+ + "name:file3,path:'oss://bucket/file1'");
// mutate config according to configuration
envFromConfiguration.configure(configuration,
Thread.currentThread().getContextClassLoader());
assertThat(envFromConfiguration.getCachedFiles(),
equalTo(Arrays.asList(
Tuple2.of("file1", new
DistributedCache.DistributedCacheEntry("/tmp1", true)),
- Tuple2.of("file2", new
DistributedCache.DistributedCacheEntry("/tmp2", false))
+ Tuple2.of("file2", new
DistributedCache.DistributedCacheEntry("/tmp2", false)),
+ Tuple2.of(
+ "file3",
+ new
DistributedCache.DistributedCacheEntry("oss://bucket/file1", false))
)));
}
@Test
+ public void testLoadingKryoSerializersFromConfiguration() {
+ StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ "pipeline.default-kryo-serializers",
+
"class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'"
+ +
",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
+
+ // mutate config according to configuration
+ envFromConfiguration.configure(
+ configuration,
+ Thread.currentThread().getContextClassLoader());
+
+ LinkedHashMap<Object, Object> serializers = new
LinkedHashMap<>();
+ serializers.put(CustomPojo.class, CustomPojoSerializer.class);
+ assertThat(
+
envFromConfiguration.getConfig().getDefaultKryoSerializerClasses(),
+ equalTo(serializers));
+ }
+
+ @Test
public void
testNotOverridingStateBackendWithDefaultsFromConfiguration() {
StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment();
envFromConfiguration.setStateBackend(new MemoryStateBackend());
@@ -97,4 +131,30 @@ public class
StreamExecutionEnvironmentComplexConfigurationTest {
Tuple2.of("file3", new
DistributedCache.DistributedCacheEntry("/tmp3", true))
)));
}
+
+ /**
+ * A dummy class to specify a Kryo serializer for.
+ */
+ public static class CustomPojo {
+ }
+
+ /**
+ * A dummy Kryo serializer which can be registered.
+ */
+ public static class CustomPojoSerializer extends Serializer<CustomPojo>
{
+ @Override
+ public void write(
+ Kryo kryo,
+ Output output,
+ CustomPojo object) {
+ }
+
+ @Override
+ public CustomPojo read(
+ Kryo kryo,
+ Input input,
+ Class<CustomPojo> type) {
+ return null;
+ }
+ }
}