lukecwik commented on a change in pull request #17103:
URL: https://github.com/apache/beam/pull/17103#discussion_r831443709
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -74,28 +157,8 @@
@Override
public PCollection<SubscriptionPartition> expand(PBegin input) {
- PCollection<TopicPath> start =
input.apply(Create.of(ImmutableList.of(topic)));
- PCollection<KV<TopicPath, Partition>> partitions =
- start.apply(
- Watch.growthOf(
- new PollFn<TopicPath, Partition>() {
- @Override
- public PollResult<Partition> apply(TopicPath element,
Context c) {
- checkArgument(element.equals(topic));
- int partitionCount = getPartitionCount.apply(element);
- List<Partition> partitions =
- IntStream.range(0, partitionCount)
- .mapToObj(Partition::of)
- .collect(Collectors.toList());
- return PollResult.incomplete(Instant.now(), partitions)
- .withWatermark(Instant.now());
- }
- })
- .withPollInterval(pollDuration)
- .withTerminationPerInput(
- terminate ? Watch.Growth.afterIterations(10) :
Watch.Growth.never()));
- return partitions.apply(
- MapElements.into(TypeDescriptor.of(SubscriptionPartition.class))
- .via(kv -> SubscriptionPartition.of(subscription, kv.getValue())));
+ return input
+ .apply(Create.of(Collections.<Void>singletonList(null)))
Review comment:
Use `Impulse.create()` since it is much lighter weight transform vs the
Create being an SDF. This will give you an empty `byte[]` as input into
GeneratorFn
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -47,15 +46,99 @@
private final SubscriptionPath subscription;
private final SerializableFunction<TopicPath, Integer> getPartitionCount;
private final Duration pollDuration;
- private final boolean terminate;
+ private final SerializableSupplier<Boolean> terminate;
+
+ private class GeneratorFn extends DoFn<Void, SubscriptionPartition> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ OutputReceiver<SubscriptionPartition> output,
+ RestrictionTracker<Integer, Integer> restrictionTracker) {
+ int previousCount = restrictionTracker.currentRestriction();
+ int newCount = getPartitionCount.apply(topic);
+ if (newCount <= previousCount) {
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+ if (!restrictionTracker.tryClaim(newCount)) {
+ return ProcessContinuation.stop();
+ }
+ Instant ts = previousCount == 0 ? BoundedWindow.TIMESTAMP_MIN_VALUE :
getWatermark();
+ for (int i = previousCount; i < newCount; ++i) {
+ output.outputWithTimestamp(SubscriptionPartition.of(subscription,
Partition.of(i)), ts);
+ }
+ if (terminate.get()) {
+ return ProcessContinuation.stop();
+ }
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+
+ @GetInitialRestriction
+ public Integer getInitialRestriction() {
+ return 0;
+ }
+
+ @NewTracker
+ public RestrictionTracker<Integer, Integer> newTracker(@Restriction
Integer input) {
+ return new RestrictionTracker<Integer, Integer>() {
+ private int position = input;
+
+ @Override
+ public boolean tryClaim(Integer newPosition) {
+ checkArgument(newPosition > position);
+ position = newPosition;
+ return true;
+ }
+
+ @Override
+ public Integer currentRestriction() {
+ return position;
+ }
+
+ @Override
+ public @Nullable SplitResult<Integer> trySplit(double
fractionOfRemainder) {
+ return null;
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {}
+
+ @Override
+ public IsBounded isBounded() {
+ return IsBounded.UNBOUNDED;
+ }
+ };
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Void> newWatermarkEstimator() {
+ return new WatermarkEstimator<Void>() {
+ @Override
+ public Instant currentWatermark() {
+ return getWatermark();
+ }
+
+ @Override
+ public Void getState() {
+ return null;
+ }
+ };
+ }
Review comment:
Use `WatermarkEstimators#Manual` and invoke the `#setWatermark`
explicitly within `@ProcessElement`
You'll want to implement `@GetInitialWatermarkEstimatorState` and return the
elements timestamp.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -74,28 +157,8 @@
@Override
public PCollection<SubscriptionPartition> expand(PBegin input) {
- PCollection<TopicPath> start =
input.apply(Create.of(ImmutableList.of(topic)));
- PCollection<KV<TopicPath, Partition>> partitions =
- start.apply(
- Watch.growthOf(
- new PollFn<TopicPath, Partition>() {
- @Override
- public PollResult<Partition> apply(TopicPath element,
Context c) {
- checkArgument(element.equals(topic));
- int partitionCount = getPartitionCount.apply(element);
- List<Partition> partitions =
- IntStream.range(0, partitionCount)
- .mapToObj(Partition::of)
- .collect(Collectors.toList());
- return PollResult.incomplete(Instant.now(), partitions)
- .withWatermark(Instant.now());
- }
- })
- .withPollInterval(pollDuration)
- .withTerminationPerInput(
- terminate ? Watch.Growth.afterIterations(10) :
Watch.Growth.never()));
- return partitions.apply(
- MapElements.into(TypeDescriptor.of(SubscriptionPartition.class))
- .via(kv -> SubscriptionPartition.of(subscription, kv.getValue())));
+ return input
+ .apply(Create.of(Collections.<Void>singletonList(null)))
+ .apply(ParDo.of(new GeneratorFn()));
Review comment:
Good to give all transforms being applied a name
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -47,15 +46,99 @@
private final SubscriptionPath subscription;
private final SerializableFunction<TopicPath, Integer> getPartitionCount;
private final Duration pollDuration;
- private final boolean terminate;
+ private final SerializableSupplier<Boolean> terminate;
+
+ private class GeneratorFn extends DoFn<Void, SubscriptionPartition> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ OutputReceiver<SubscriptionPartition> output,
+ RestrictionTracker<Integer, Integer> restrictionTracker) {
+ int previousCount = restrictionTracker.currentRestriction();
+ int newCount = getPartitionCount.apply(topic);
+ if (newCount <= previousCount) {
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+ if (!restrictionTracker.tryClaim(newCount)) {
+ return ProcessContinuation.stop();
+ }
+ Instant ts = previousCount == 0 ? BoundedWindow.TIMESTAMP_MIN_VALUE :
getWatermark();
+ for (int i = previousCount; i < newCount; ++i) {
+ output.outputWithTimestamp(SubscriptionPartition.of(subscription,
Partition.of(i)), ts);
+ }
+ if (terminate.get()) {
+ return ProcessContinuation.stop();
+ }
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+
+ @GetInitialRestriction
+ public Integer getInitialRestriction() {
+ return 0;
+ }
+
+ @NewTracker
+ public RestrictionTracker<Integer, Integer> newTracker(@Restriction
Integer input) {
+ return new RestrictionTracker<Integer, Integer>() {
+ private int position = input;
+
+ @Override
+ public boolean tryClaim(Integer newPosition) {
+ checkArgument(newPosition > position);
+ position = newPosition;
+ return true;
+ }
+
+ @Override
+ public Integer currentRestriction() {
+ return position;
+ }
+
+ @Override
+ public @Nullable SplitResult<Integer> trySplit(double
fractionOfRemainder) {
+ return null;
+ }
+
+ @Override
+ public void checkDone() throws IllegalStateException {}
Review comment:
this would have made sense to have embedded the `terminate` function in
the restriction tracker and ensured that it returned true.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -47,15 +46,99 @@
private final SubscriptionPath subscription;
private final SerializableFunction<TopicPath, Integer> getPartitionCount;
private final Duration pollDuration;
- private final boolean terminate;
+ private final SerializableSupplier<Boolean> terminate;
+
+ private class GeneratorFn extends DoFn<Void, SubscriptionPartition> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ OutputReceiver<SubscriptionPartition> output,
+ RestrictionTracker<Integer, Integer> restrictionTracker) {
+ int previousCount = restrictionTracker.currentRestriction();
+ int newCount = getPartitionCount.apply(topic);
+ if (newCount <= previousCount) {
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+ if (!restrictionTracker.tryClaim(newCount)) {
+ return ProcessContinuation.stop();
+ }
Review comment:
This logic is meant to be within the restriction tracker but up to you
whether you want the restriction tracker to semantically provide the logic it
was built to perform.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -47,15 +46,99 @@
private final SubscriptionPath subscription;
private final SerializableFunction<TopicPath, Integer> getPartitionCount;
private final Duration pollDuration;
- private final boolean terminate;
+ private final SerializableSupplier<Boolean> terminate;
+
+ private class GeneratorFn extends DoFn<Void, SubscriptionPartition> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ OutputReceiver<SubscriptionPartition> output,
+ RestrictionTracker<Integer, Integer> restrictionTracker) {
+ int previousCount = restrictionTracker.currentRestriction();
+ int newCount = getPartitionCount.apply(topic);
+ if (newCount <= previousCount) {
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+ if (!restrictionTracker.tryClaim(newCount)) {
+ return ProcessContinuation.stop();
+ }
+ Instant ts = previousCount == 0 ? BoundedWindow.TIMESTAMP_MIN_VALUE :
getWatermark();
+ for (int i = previousCount; i < newCount; ++i) {
+ output.outputWithTimestamp(SubscriptionPartition.of(subscription,
Partition.of(i)), ts);
+ }
+ if (terminate.get()) {
+ return ProcessContinuation.stop();
+ }
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
Review comment:
```suggestion
RestrictionTracker<Integer, Integer> restrictionTracker,
@WatermarkEstimator ManualWatermarkEstimator estimator) {
int previousCount = restrictionTracker.currentRestriction();
int newCount = getPartitionCount.apply(topic);
if (newCount <= previousCount) {
return ProcessContinuation.resume().withResumeDelay(pollDuration);
}
if (!restrictionTracker.tryClaim(newCount)) {
return ProcessContinuation.stop();
}
for (int i = previousCount; i < newCount; ++i) {
output.outputWithTimestamp(SubscriptionPartition.of(subscription,
Partition.of(i)), estimator.currentWatermark());
}
if (terminate.get()) {
return ProcessContinuation.stop();
}
estimator.setWatermark(getWatermark());
return ProcessContinuation.resume().withResumeDelay(pollDuration);
```
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java
##########
@@ -47,15 +46,99 @@
private final SubscriptionPath subscription;
private final SerializableFunction<TopicPath, Integer> getPartitionCount;
private final Duration pollDuration;
- private final boolean terminate;
+ private final SerializableSupplier<Boolean> terminate;
+
+ private class GeneratorFn extends DoFn<Void, SubscriptionPartition> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ OutputReceiver<SubscriptionPartition> output,
+ RestrictionTracker<Integer, Integer> restrictionTracker) {
+ int previousCount = restrictionTracker.currentRestriction();
+ int newCount = getPartitionCount.apply(topic);
+ if (newCount <= previousCount) {
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+ if (!restrictionTracker.tryClaim(newCount)) {
+ return ProcessContinuation.stop();
+ }
+ Instant ts = previousCount == 0 ? BoundedWindow.TIMESTAMP_MIN_VALUE :
getWatermark();
+ for (int i = previousCount; i < newCount; ++i) {
+ output.outputWithTimestamp(SubscriptionPartition.of(subscription,
Partition.of(i)), ts);
+ }
+ if (terminate.get()) {
+ return ProcessContinuation.stop();
+ }
+ return ProcessContinuation.resume().withResumeDelay(pollDuration);
+ }
+
+ @GetInitialRestriction
+ public Integer getInitialRestriction() {
+ return 0;
+ }
+
+ @NewTracker
+ public RestrictionTracker<Integer, Integer> newTracker(@Restriction
Integer input) {
Review comment:
Any meaningful progress/backlog to report?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]