StephanEwen commented on a change in pull request #8756: [FLINK-12406]
[Runtime] Report BLOCKING_PERSISTENT result partition meta back to client
URL: https://github.com/apache/flink/pull/8756#discussion_r299701304
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
##########
@@ -66,6 +74,49 @@ public void mapPartition(Iterable<Integer> values,
Collector<Integer> out) throw
assertEquals(PARALLELISM, resultCollection.size());
}
+ @Test
+ public void testAccessingBlockingPersistentResultPartition() throws
Exception {
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(1);
+
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new
Tuple2<>(1L, 2L));
+
+ DataSet ds = input.map((MapFunction<Tuple2<Long, Long>,
Object>) value -> new Tuple2<>(value.f0 + 1, value.f1));
+
+ // specify IntermediateDataSetID
+ AbstractID intermediateDataSetId = new AbstractID();
+
+ // this output branch will be excluded.
+
ds.output(BlockingShuffleOutputFormat.createOutputFormat(intermediateDataSetId))
+ .setParallelism(1);
+
+ ds.collect();
+
+ BlockingPersistentResultPartitionMeta meta =
env.getBlockingPersistentResultPartitionMeta();
+
+ // only one cached IntermediateDataSet
+ Assert.assertEquals(1,
meta.getResultPartitionDescriptors().size());
+
+ AbstractID intermediateDataSetID =
meta.getResultPartitionDescriptors().keySet().iterator().next();
+
+ // IntermediateDataSetID should be the same
+ Assert.assertEquals(intermediateDataSetID,
intermediateDataSetID);
+
+ Map<AbstractID, ResultPartitionDescriptor> descriptors =
meta.getResultPartitionDescriptors().get(intermediateDataSetID);
+
+ Assert.assertEquals(1, descriptors.size());
+
+ ResultPartitionDescriptor descriptor =
descriptors.values().iterator().next();
+
+ Assert.assertTrue(
Review comment:
Separate conditions should have separate assertions, so that it is visible
from a test failure which of the conditions failed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services