mynameborat commented on code in PR #23434:
URL: https://github.com/apache/beam/pull/23434#discussion_r1003630941
##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java:
##########
@@ -168,4 +174,57 @@ public void testPipelineWithAggregation() {
pipeline.run();
}
+
+ @Test
+ public void testKeyedOutputFutures() {
+ // We test the scenario that two elements of the same key needs to be
processed in order.
+ final DoFnRunner<KV<String, Integer>, Void> doFnRunner =
mock(DoFnRunner.class);
+ final AtomicInteger prev = new AtomicInteger(0);
+ doAnswer(
+ invocation -> {
+ WindowedValue<KV<String, Integer>> wv =
invocation.getArgument(0);
+ Integer val = wv.getValue().getValue();
+
+ // Verify the previous element has been fully processed by
checking the prev value
+ assertEquals(val - 1, prev.get());
+
+ prev.set(val);
+ return null;
+ })
+ .when(doFnRunner)
+ .processElement(any());
+
+ SamzaPipelineOptions options =
PipelineOptionsFactory.as(SamzaPipelineOptions.class);
+ options.setNumThreadsForProcessElement(4);
+
+ final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
+ final FutureCollector<Void> futureCollector = new
DoFnOp.FutureCollectorImpl<>();
+ futureCollector.prepare();
+
+ final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
+ AsyncDoFnRunner.create(doFnRunner, opEmitter, futureCollector, true,
options);
+
+ final String appleKey = "apple";
+
+ final WindowedValue<KV<String, Integer>> input1 =
+ WindowedValue.valueInGlobalWindow(KV.of(appleKey, 1));
+
+ final WindowedValue<KV<String, Integer>> input2 =
+ WindowedValue.valueInGlobalWindow(KV.of(appleKey, 2));
+
+ asyncDoFnRunner.processElement(input1);
+ asyncDoFnRunner.processElement(input2);
Review Comment:
Is there a way to introduce some sort of additional delay in the execution
of input1 to ensure there is time for the following
`asyncDoFnRunner.processElement(input2)` to be invoked and guarantee that it
is gated on the first call?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]