mynameborat commented on code in PR #23434:
URL: https://github.com/apache/beam/pull/23434#discussion_r1003630941


##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java:
##########
@@ -168,4 +174,57 @@ public void testPipelineWithAggregation() {
 
     pipeline.run();
   }
+
+  @Test
+  public void testKeyedOutputFutures() {
+    // We test the scenario that two elements of the same key needs to be 
processed in order.
+    final DoFnRunner<KV<String, Integer>, Void> doFnRunner = 
mock(DoFnRunner.class);
+    final AtomicInteger prev = new AtomicInteger(0);
+    doAnswer(
+            invocation -> {
+              WindowedValue<KV<String, Integer>> wv = 
invocation.getArgument(0);
+              Integer val = wv.getValue().getValue();
+
+              // Verify the previous element has been fully processed by 
checking the prev value
+              assertEquals(val - 1, prev.get());
+
+              prev.set(val);
+              return null;
+            })
+        .when(doFnRunner)
+        .processElement(any());
+
+    SamzaPipelineOptions options = 
PipelineOptionsFactory.as(SamzaPipelineOptions.class);
+    options.setNumThreadsForProcessElement(4);
+
+    final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
+    final FutureCollector<Void> futureCollector = new 
DoFnOp.FutureCollectorImpl<>();
+    futureCollector.prepare();
+
+    final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
+        AsyncDoFnRunner.create(doFnRunner, opEmitter, futureCollector, true, 
options);
+
+    final String appleKey = "apple";
+
+    final WindowedValue<KV<String, Integer>> input1 =
+        WindowedValue.valueInGlobalWindow(KV.of(appleKey, 1));
+
+    final WindowedValue<KV<String, Integer>> input2 =
+        WindowedValue.valueInGlobalWindow(KV.of(appleKey, 2));
+
+    asyncDoFnRunner.processElement(input1);
+    asyncDoFnRunner.processElement(input2);

Review Comment:
   Is there a way to introduce some sort of additional delay in the execution 
of input1 to ensure there is time for the following 
`asyncDoFnRunner.processElement(input2)` to be invoked and  guarantee that it 
is gated on the first call?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to