[
https://issues.apache.org/jira/browse/FLINK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823913#comment-15823913
]
ASF GitHub Bot commented on FLINK-5480:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3117#discussion_r96222455
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
---
@@ -422,6 +425,43 @@ public void
testManualHashAssignmentForStartNodeInInChain() throws Exception {
env.getStreamGraph().getJobGraph();
}
+ @Test
+ public void testUserProvidedHashing() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+
+ List<String> userHashes =
Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+
+ env.addSource(new NoOpSourceFunction(),
"src").provideAdditionalNodeHash(userHashes.get(0))
+ .map(new NoOpMapFunction())
+ .filter(new NoOpFilterFunction())
+ .keyBy(new NoOpKeySelector())
+ .reduce(new
NoOpReduceFunction()).name("reduce").provideAdditionalNodeHash(userHashes.get(1));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ int idx = 1;
+ for (JobVertex jobVertex :
streamGraph.getJobGraph().getVertices()) {
+
Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(),
userHashes.get(idx));
+ --idx;
+ }
+ }
+
+ @Test
+ public void testUserProvidedHashingOnChainNotSupported() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+
+ env.addSource(new NoOpSourceFunction(),
"src").provideAdditionalNodeHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+ .map(new
NoOpMapFunction()).provideAdditionalNodeHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
--- End diff --
the fact that this fails the job should probably be documented in the
javadocs of ```provideAdditionalNodeHash```.
> User-provided hashes for operators
> ----------------------------------
>
> Key: FLINK-5480
> URL: https://issues.apache.org/jira/browse/FLINK-5480
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Affects Versions: 1.2.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> We could allow users to provided (alternative) hashes for operators in a
> StreamGraph. This can make migration between Flink versions easier, in case
> the automatically produced hashes between versions are incompatible. For
> example, users could just copy the old hashes from the web ui to their job.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)