rkhachatryan commented on a change in pull request #15013:
URL: https://github.com/apache/flink/pull/15013#discussion_r582670718
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -359,6 +366,16 @@ private void setChaining(Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>>
}
}
+ private static int compareHashes(byte[] hash1, byte[] hash2) {
+ for (int index = 0; index < hash1.length; index++) {
+ int diff = hash2[index] - hash1[index];
Review comment:
nit: handle arrays of different length
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -954,6 +954,41 @@ public void
testYieldingOperatorProperlyChainedOnNewSources() {
assertEquals(4, vertices.get(0).getOperatorIDs().size());
}
+ @Test
+ public void testDeterministicUnionOrder() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ JobGraph jobGraph = getUnionJobGraph(env);
+ JobVertex jobSink =
Iterables.getLast(jobGraph.getVerticesSortedTopologicallyFromSources());
+ List<String> expectedSourceOrder =
+ jobSink.getInputs().stream()
+ .map(edge -> edge.getSource().getProducer().getName())
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < 100; i++) {
+ JobGraph jobGraph2 = getUnionJobGraph(env);
+ JobVertex jobSink2 =
+
Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
+ assertNotEquals("Different runs should yield different vertexes",
jobSink, jobSink2);
+ List<String> actualSourceOrder =
+ jobSink2.getInputs().stream()
+ .map(edge ->
edge.getSource().getProducer().getName())
+ .collect(Collectors.toList());
+ assertEquals("Union inputs reordered", actualSourceOrder,
expectedSourceOrder);
Review comment:
nit: flip expected | actual
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -347,7 +348,13 @@ private void setChaining(Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>>
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
- new ArrayList<>(chainEntryPoints.values());
+ chainEntryPoints.values().stream()
+ .sorted(
+ Comparator.comparing(
+ operatorChainInfo ->
+
hashes.get(operatorChainInfo.getStartNodeId()),
+
StreamingJobGraphGenerator::compareHashes))
Review comment:
Is it actually possible to have multiple sources with the same
startNodeId?
If so, I think hash comparison is not covered by tests (please feel free to
leave it as is if it would require too much effort).
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -954,6 +954,41 @@ public void
testYieldingOperatorProperlyChainedOnNewSources() {
assertEquals(4, vertices.get(0).getOperatorIDs().size());
}
+ @Test
+ public void testDeterministicUnionOrder() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ JobGraph jobGraph = getUnionJobGraph(env);
+ JobVertex jobSink =
Iterables.getLast(jobGraph.getVerticesSortedTopologicallyFromSources());
+ List<String> expectedSourceOrder =
+ jobSink.getInputs().stream()
+ .map(edge -> edge.getSource().getProducer().getName())
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < 100; i++) {
+ JobGraph jobGraph2 = getUnionJobGraph(env);
Review comment:
nit: add more than just two sources in `getUnionJobGraph` to increase
failure probability?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]