aljoscha commented on a change in pull request #13965:
URL: https://github.com/apache/flink/pull/13965#discussion_r520549990
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
##########
@@ -156,4 +190,30 @@ public void onJobExecuted(@Nullable JobExecutionResult
jobExecutionResult, @Null
}
}
+
+ /**
+ * A dummy class to specify a Kryo serializer for.
+ */
+ public static class CustomPojo {
+ }
+
+ /**
+ * A dummy Kryo serializer which can be registered.
+ */
+ public static class CustomPojoSerializer extends Serializer<CustomPojo>
{
+ @Override
+ public void write(
+ Kryo kryo,
Review comment:
I'm doing this just to nitpick, because I think style doesn't matter
unless it's enforced:
I think out style guide says to use two indentation levels for parameters.
The `.editorconfig` file was recently changed to break this so I suggest to use
a good `.editorconfig` file and ignore the one in git using `git update-index
--skip-worktree .editorconfig`.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
##########
@@ -66,20 +71,49 @@ public void testLoadingStateBackendFromConfiguration() {
@Test
public void testLoadingCachedFilesFromConfiguration() {
StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment();
- envFromConfiguration.registerCachedFile("/tmp3", "file3", true);
+ envFromConfiguration.registerCachedFile("/tmp4", "file4", true);
Configuration configuration = new Configuration();
- configuration.setString("pipeline.cached-files",
"name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2");
+ configuration.setString(
+ "pipeline.cached-files",
+ "name:file1,path:/tmp1,executable:true;"
+ + "name:file2,path:/tmp2;"
+ + "name:file3,path:'oss://bucket/file1'");
// mutate config according to configuration
envFromConfiguration.configure(configuration,
Thread.currentThread().getContextClassLoader());
assertThat(envFromConfiguration.getCachedFiles(),
equalTo(Arrays.asList(
Tuple2.of("file1", new
DistributedCache.DistributedCacheEntry("/tmp1", true)),
- Tuple2.of("file2", new
DistributedCache.DistributedCacheEntry("/tmp2", false))
+ Tuple2.of("file2", new
DistributedCache.DistributedCacheEntry("/tmp2", false)),
+ Tuple2.of(
+ "file3",
+ new
DistributedCache.DistributedCacheEntry("oss://bucket/file1", false))
)));
}
+ @Test
+ public void testLoadingKryoSerializersFromConfiguration() {
+ StreamExecutionEnvironment envFromConfiguration =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(
+ "pipeline.default-kryo-serializers",
+
"class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'"
+ +
",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
+
+ // mutate config according to configuration
+ envFromConfiguration.configure(
+ configuration,
+ Thread.currentThread().getContextClassLoader());
Review comment:
Could be simplified to use `getExecutionEnvironment(Configuration)`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]