acrites commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3211706010
##########
runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java:
##########
@@ -95,6 +95,30 @@ public OffsetRange
getInitialRestriction(@SuppressWarnings("unused") @Element Vo
}
}
+ private static class GetSizeFn extends DoFn<Void, String> {
+ @ProcessElement
+ public ProcessContinuation process(
+ ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+ for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
+ c.output(String.valueOf(i));
+ if (i == 2) {
+ return resume();
+ }
+ }
+ return stop();
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction() {
+ return new OffsetRange(0, 10);
+ }
+
+ @GetSize
Review Comment:
If GetSize isn't defined, we call this default GetSize method, which calls
getProgress on the restriction tracker:
https://github.com/apache/beam/blob/b60082d17d4c86a6cdd98e23778490aceb40b432/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L530.
I have a SdfWithouGetSize ParDo in SplittableParDoProcessFnTest.java that
tests this works.
##########
runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java:
##########
@@ -95,6 +95,30 @@ public OffsetRange
getInitialRestriction(@SuppressWarnings("unused") @Element Vo
}
}
+ private static class GetSizeFn extends DoFn<Void, String> {
+ @ProcessElement
+ public ProcessContinuation process(
+ ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+ for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
+ c.output(String.valueOf(i));
+ if (i == 2) {
+ return resume();
+ }
+ }
+ return stop();
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction() {
+ return new OffsetRange(0, 10);
+ }
+
+ @GetSize
Review Comment:
If GetSize isn't defined, we call this default GetSize method, which calls
getProgress on the restriction tracker:
https://github.com/apache/beam/blob/b60082d17d4c86a6cdd98e23778490aceb40b432/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L530.
I have a SdfWithoutGetSize ParDo in SplittableParDoProcessFnTest.java that
tests this works.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]