acrites commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2874178053


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -633,4 +640,117 @@ public <T> T get(PCollectionView<T> view, final 
BoundedWindow window) {
       throw new IllegalArgumentException("calling getSideInput() with unknown 
view");
     }
   }
+
+  @Test
+  public void testBundleFinalizer() throws Exception {
+    bundleSuccessCount.set(0);
+    DoFnInfo<Long, String> fnInfo =
+        DoFnInfo.forFn(
+            new WithBundleFinalizerDoFn(),
+            WindowingStrategy.globalDefault(),
+            null /* side input views */,
+            VarLongCoder.of(),
+            MAIN_OUTPUT,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+    DataflowExecutionContext.DataflowStepContext userStepContext =
+        Mockito.mock(
+            DataflowExecutionContext.DataflowStepContext.class,
+            invocation -> {
+              if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+                return new BundleFinalizer() {
+                  @Override
+                  public void afterBundleCommit(Instant expiry, Callback 
callback) {
+                    try {
+                      callback.onBundleSuccess();
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                };
+              }
+              return invocation.getMethod().invoke(stepContext, 
invocation.getArguments());
+            });
+
+    DataflowStepContext stepContextWithBundleFinalizer =
+        Mockito.mock(
+            DataflowStepContext.class,
+            invocation -> {
+              if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+                return new BundleFinalizer() {
+                  @Override
+                  public void afterBundleCommit(Instant expiry, Callback 
callback) {
+                    try {
+                      callback.onBundleSuccess();
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                };
+              }
+              if (invocation.getMethod().getName().equals("namespacedToUser")) 
{
+                return userStepContext;
+              }
+              return invocation.getMethod().invoke(stepContext, 
invocation.getArguments());
+            });
+
+    ParDoFn parDoFn =
+        new SimpleParDoFn<>(
+            options,
+            DoFnInstanceManagers.singleInstance(fnInfo),
+            new EmptySideInputReader(),
+            MAIN_OUTPUT,
+            ImmutableMap.of(MAIN_OUTPUT, 0),
+            stepContextWithBundleFinalizer,
+            operationContext,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap(),
+            SimpleDoFnRunnerFactory.INSTANCE);
+
+    parDoFn.startBundle(new TestReceiver());
+
+    // Process a few elements
+    for (int i = 0; i < 5; i++) {
+      parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+    }
+
+    parDoFn.finishBundle();

Review Comment:
   This test isn't testing that StreamingCommitFinalizer is doing anything. 
It's mostly just to test that the BundleFinalizer is getting propagated from 
the context to the ParDo's method inputs.
   
   We're passing in a mock context that has `afterBundleCommit` just call the 
callback right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to