acrites commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2874843640
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java:
##########
@@ -633,4 +640,117 @@ public <T> T get(PCollectionView<T> view, final
BoundedWindow window) {
throw new IllegalArgumentException("calling getSideInput() with unknown
view");
}
}
+
+ @Test
+ public void testBundleFinalizer() throws Exception {
+ bundleSuccessCount.set(0);
+ DoFnInfo<Long, String> fnInfo =
+ DoFnInfo.forFn(
+ new WithBundleFinalizerDoFn(),
+ WindowingStrategy.globalDefault(),
+ null /* side input views */,
+ VarLongCoder.of(),
+ MAIN_OUTPUT,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+ DataflowExecutionContext.DataflowStepContext userStepContext =
+ Mockito.mock(
+ DataflowExecutionContext.DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ DataflowStepContext stepContextWithBundleFinalizer =
+ Mockito.mock(
+ DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ if (invocation.getMethod().getName().equals("namespacedToUser"))
{
+ return userStepContext;
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ ParDoFn parDoFn =
+ new SimpleParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ new EmptySideInputReader(),
+ MAIN_OUTPUT,
+ ImmutableMap.of(MAIN_OUTPUT, 0),
+ stepContextWithBundleFinalizer,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ SimpleDoFnRunnerFactory.INSTANCE);
+
+ parDoFn.startBundle(new TestReceiver());
+
+ // Process a few elements
+ for (int i = 0; i < 5; i++) {
+ parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+ }
+
+ parDoFn.finishBundle();
+
+ // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1
in FinishBundle.
+ // Total should be 7.
+ assertThat(getBundleSuccessCount(), equalTo(7));
Review Comment:
I duplicated the Atomics, but got rid of the accessor methods to reduce
boilerplate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]