boyuanzz commented on a change in pull request #13470: URL: https://github.com/apache/beam/pull/13470#discussion_r543789128
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PartitionProcessorFactory.java ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.io.Serializable; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; + +interface PartitionProcessorFactory extends Serializable { Review comment: `SubscriptionPartitionFactory`? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PartitionProcessor.java ########## @@ -17,12 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import com.google.cloud.pubsublite.Offset; -import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; -import java.util.Map; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.joda.time.Duration; -/** An internal interface for finalizing offsets. */ -interface OffsetFinalizer { - void finalizeOffsets(Map<Partition, Offset> offsets) throws CheckedApiException; +interface PartitionProcessor extends AutoCloseable { Review comment: `SubscriptionPartitionProcessor`? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.joda.time.Duration; +import org.joda.time.Instant; + +class PerPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> { + private final Duration maxSleepTime; + private final PartitionProcessorFactory processorFactory; + private final SerializableFunction<SubscriptionPartition, InitialOffsetReader> + offsetReaderFactory; + private final SerializableBiFunction< + SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> + trackerFactory; + private final SerializableFunction<SubscriptionPartition, Committer> committerFactory; + + PerPartitionSdf( + Duration maxSleepTime, + SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory, + SerializableBiFunction< + SubscriptionPartition, + OffsetRange, + RestrictionTracker<OffsetRange, OffsetByteProgress>> + trackerFactory, + PartitionProcessorFactory processorFactory, + SerializableFunction<SubscriptionPartition, Committer> committerFactory) { + this.maxSleepTime = maxSleepTime; + this.processorFactory = processorFactory; + this.offsetReaderFactory = offsetReaderFactory; + this.trackerFactory = trackerFactory; + this.committerFactory = committerFactory; + } + + private static final class WrappedTracker + extends RestrictionTracker<OffsetRange, OffsetByteProgress> { + private final RestrictionTracker<OffsetRange, OffsetByteProgress> underlying; + Optional<Offset> lastClaimed; + + WrappedTracker(RestrictionTracker<OffsetRange, OffsetByteProgress> underlying) { + this.underlying = underlying; + this.lastClaimed = Optional.empty(); + } + + @Override + public boolean tryClaim(OffsetByteProgress position) { + boolean claimed = underlying.tryClaim(position); + if (claimed) { + lastClaimed = Optional.of(position.lastOffset()); + } + return claimed; + } + + @Override + public OffsetRange currentRestriction() { + return underlying.currentRestriction(); + } + + @Override + public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { + return underlying.trySplit(fractionOfRemainder); + } + + @Override + public void checkDone() throws IllegalStateException { + underlying.checkDone(); + } + + @Override + public IsBounded isBounded() { + return underlying.isBounded(); + } + } + + @GetInitialWatermarkEstimatorState + Instant getInitialWatermarkState() { + return Instant.EPOCH; + } + + @NewWatermarkEstimator + MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState Instant state) { + return new MonotonicallyIncreasing(state); + } + + @ProcessElement + public ProcessContinuation processElement( + RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, + @Element SubscriptionPartition subscriptionPartition, + OutputReceiver<SequencedMessage> receiver, + BundleFinalizer finalizer) + throws Exception { + WrappedTracker wrapped = new WrappedTracker(tracker); + try (PartitionProcessor processor = + processorFactory.newProcessor(subscriptionPartition, wrapped, receiver)) { + processor.start(); + ProcessContinuation result = processor.waitForCompletion(maxSleepTime); + wrapped.lastClaimed.ifPresent( + lastClaimedOffset -> + finalizer.afterBundleCommit( + Instant.ofEpochMilli(Long.MAX_VALUE), + () -> { + Committer committer = committerFactory.apply(subscriptionPartition); + committer.startAsync().awaitRunning(); + // Commit the next-to-deliver offset. + committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); + committer.stopAsync().awaitTerminated(); + })); + return result; + } + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) { + try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) { + Offset offset = reader.read(); + return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */); + } + } + + @NewTracker + public RestrictionTracker<OffsetRange, OffsetByteProgress> newTracker( + @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) { + return trackerFactory.apply(subscriptionPartition, range); Review comment: I'm curious why we need a `trackerFactory` here instead of returning your `OffsetByteRangeTracker` directly. Do you expect your SDF user to implement their own restriction tracker? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionCoder.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SubscriptionPath; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.DelegateCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TypeDescriptor; + +public class SubscriptionPartitionCoder extends AtomicCoder<SubscriptionPartition> { Review comment: Have you consider the x-lang usage, where you may want to use `Schema` to represent your element? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PartitionProcessorImpl.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.api.core.ApiService.Listener; +import com.google.api.core.ApiService.State; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.cloud.pubsublite.internal.wire.Subscriber; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.FlowControlRequest; +import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.protobuf.util.Timestamps; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; +import org.joda.time.Duration; +import org.joda.time.Instant; + +class PartitionProcessorImpl extends Listener implements PartitionProcessor { Review comment: `SubscriptionPartitionProcessorImpl`? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; +import org.joda.time.Duration; + +/** + * OffsetByteRangeTracker is an unbounded restriction tracker for Pub/Sub lite partitions that + * tracks offsets for checkpointing and bytes for progress. + * + * <p>Any valid instance of an OffsetByteRangeTracker tracks one of exactly two types of ranges: - + * Unbounded ranges whose last offset is Long.MAX_VALUE - Completed ranges that are either empty + * (From == To) or fully claimed (lastClaimed == To - 1) + * + * <p>Also prevents splitting until minTrackingTime has passed or minBytesReceived have been + * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it + * would return ProcessContinuation.resume(). + */ +class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress> + implements HasProgress { + private final TopicBacklogReader backlogReader; + private final Duration minTrackingTime; + private final long minBytesReceived; + private final Stopwatch stopwatch; + private OffsetRange range; + private @Nullable Long lastClaimed; + private long byteCount = 0; + + public OffsetByteRangeTracker( + OffsetRange range, + TopicBacklogReader backlogReader, + Stopwatch stopwatch, + Duration minTrackingTime, + long minBytesReceived) { + checkArgument(range.getTo() == Long.MAX_VALUE); + this.backlogReader = backlogReader; + this.minTrackingTime = minTrackingTime; + this.minBytesReceived = minBytesReceived; + this.stopwatch = stopwatch.reset().start(); + this.range = range; + } + + @Override + public void finalize() { + this.backlogReader.close(); + } + + @Override + public IsBounded isBounded() { + return IsBounded.UNBOUNDED; + } + + @Override + public boolean tryClaim(OffsetByteProgress position) { + long toClaim = position.lastOffset().value(); + checkArgument( + lastClaimed == null || toClaim > lastClaimed, + "Trying to claim offset %s while last attempted was %s", + position.lastOffset().value(), + lastClaimed); + checkArgument( + toClaim >= range.getFrom(), + "Trying to claim offset %s before start of the range %s", + toClaim, + range); + // split() has already been called, truncating this range. No more offsets may be claimed. + if (range.getTo() != Long.MAX_VALUE) { + boolean isRangeEmpty = range.getTo() == range.getFrom(); + boolean isValidClosedRange = nextOffset() == range.getTo(); + checkState( + isRangeEmpty || isValidClosedRange, + "Violated class precondition: offset range improperly split. Please report a beam bug."); + return false; + } + lastClaimed = toClaim; + byteCount += position.batchBytes(); + return true; + } + + @Override + public OffsetRange currentRestriction() { + return range; + } + + private long nextOffset() { + checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE); + return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1; + } + + /** + * Whether the tracker has received enough data/been running for enough time that it can + * checkpoint and be confident it can get sufficient throughput. + */ + private boolean receivedEnough() { + Duration duration = Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + if (duration.isLongerThan(minTrackingTime)) { + return true; + } + if (byteCount >= minBytesReceived) { + return true; + } + return false; + } + + @Override + public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { Review comment: You also want to return `null` when `fractionOfRemainder > 0.0` ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.Subscriber; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; +import org.joda.time.Duration; + +class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> { + private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1); + + private final SubscriberOptions options; + + SubscribeTransform(SubscriberOptions options) { + this.options = options; + } + + private void checkSubscription(SubscriptionPartition subscriptionPartition) throws ApiException { + checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath())); + } + + private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) { + try { + return options + .getSubscriberFactory(partition) + .newSubscriber( + messages -> + consumer.accept( + messages.stream() + .map(message -> message.toProto()) + .collect(Collectors.toList()))); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private PartitionProcessor newPartitionProcessor( + SubscriptionPartition subscriptionPartition, + RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, + OutputReceiver<SequencedMessage> receiver) + throws ApiException { + checkSubscription(subscriptionPartition); + return new PartitionProcessorImpl( + tracker, + receiver, + consumer -> newSubscriber(subscriptionPartition.partition(), consumer), + options.flowControlSettings()); + } + + private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker( + SubscriptionPartition subscriptionPartition, OffsetRange initial) { + checkSubscription(subscriptionPartition); + return new OffsetByteRangeTracker( + initial, + options.getBacklogReader(subscriptionPartition.partition()), + Stopwatch.createUnstarted(), + MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4), + LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); + } + + private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) { + checkSubscription(subscriptionPartition); + return options.getInitialOffsetReader(subscriptionPartition.partition()); + } + + private Committer newCommitter(SubscriptionPartition subscriptionPartition) { + checkSubscription(subscriptionPartition); + return options.getCommitter(subscriptionPartition.partition()); + } + + @Override + public PCollection<SequencedMessage> expand(PBegin input) { + PCollection<SubscriptionPartition> partitions = + Create.of( + options.partitions().stream() + .map( + partition -> + SubscriptionPartition.of(options.subscriptionPath(), partition)) + .collect(Collectors.toList())) + .expand(input); + // Prevent fusion between Create and read. + PCollection<SubscriptionPartition> shuffledPartitions = + partitions.apply(Reshuffle.viaRandomKey()); + return shuffledPartitions.apply( + ParDo.of( + new PerPartitionSdf( + MAX_SLEEP_TIME, + this::newInitialOffsetReader, Review comment: Can the `PerPartitionSdf` take the `SubscriberOptions ` as the constructor? Then the `PerParittionSdf` can construct `Committer`, `PartitionProcessor` and `InitialOffsetReader` by itself, instead of asking the caller to do so? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.joda.time.Duration; +import org.joda.time.Instant; + +class PerPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> { + private final Duration maxSleepTime; + private final PartitionProcessorFactory processorFactory; + private final SerializableFunction<SubscriptionPartition, InitialOffsetReader> + offsetReaderFactory; + private final SerializableBiFunction< + SubscriptionPartition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>> + trackerFactory; + private final SerializableFunction<SubscriptionPartition, Committer> committerFactory; + + PerPartitionSdf( + Duration maxSleepTime, + SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory, + SerializableBiFunction< + SubscriptionPartition, + OffsetRange, + RestrictionTracker<OffsetRange, OffsetByteProgress>> + trackerFactory, + PartitionProcessorFactory processorFactory, + SerializableFunction<SubscriptionPartition, Committer> committerFactory) { + this.maxSleepTime = maxSleepTime; + this.processorFactory = processorFactory; + this.offsetReaderFactory = offsetReaderFactory; + this.trackerFactory = trackerFactory; + this.committerFactory = committerFactory; + } + + private static final class WrappedTracker + extends RestrictionTracker<OffsetRange, OffsetByteProgress> { + private final RestrictionTracker<OffsetRange, OffsetByteProgress> underlying; + Optional<Offset> lastClaimed; + + WrappedTracker(RestrictionTracker<OffsetRange, OffsetByteProgress> underlying) { + this.underlying = underlying; + this.lastClaimed = Optional.empty(); + } + + @Override + public boolean tryClaim(OffsetByteProgress position) { + boolean claimed = underlying.tryClaim(position); + if (claimed) { + lastClaimed = Optional.of(position.lastOffset()); + } + return claimed; + } + + @Override + public OffsetRange currentRestriction() { + return underlying.currentRestriction(); + } + + @Override + public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) { + return underlying.trySplit(fractionOfRemainder); + } + + @Override + public void checkDone() throws IllegalStateException { + underlying.checkDone(); + } + + @Override + public IsBounded isBounded() { + return underlying.isBounded(); + } + } + + @GetInitialWatermarkEstimatorState + Instant getInitialWatermarkState() { + return Instant.EPOCH; + } + + @NewWatermarkEstimator + MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState Instant state) { + return new MonotonicallyIncreasing(state); + } + + @ProcessElement + public ProcessContinuation processElement( + RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, + @Element SubscriptionPartition subscriptionPartition, + OutputReceiver<SequencedMessage> receiver, + BundleFinalizer finalizer) + throws Exception { + WrappedTracker wrapped = new WrappedTracker(tracker); + try (PartitionProcessor processor = + processorFactory.newProcessor(subscriptionPartition, wrapped, receiver)) { + processor.start(); + ProcessContinuation result = processor.waitForCompletion(maxSleepTime); + wrapped.lastClaimed.ifPresent( Review comment: When you have got the result from `processor.waitForCompletion(maxSleepTime)`, the `tracker.currentRestriction().getTo()` will be the `lastClaimed` you want. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.Subscriber; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; +import org.joda.time.Duration; + +class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> { + private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1); + + private final SubscriberOptions options; + + SubscribeTransform(SubscriberOptions options) { + this.options = options; + } + + private void checkSubscription(SubscriptionPartition subscriptionPartition) throws ApiException { + checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath())); + } + + private Subscriber newSubscriber(Partition partition, Consumer<List<SequencedMessage>> consumer) { + try { + return options + .getSubscriberFactory(partition) + .newSubscriber( + messages -> + consumer.accept( + messages.stream() + .map(message -> message.toProto()) + .collect(Collectors.toList()))); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private PartitionProcessor newPartitionProcessor( + SubscriptionPartition subscriptionPartition, + RestrictionTracker<OffsetRange, OffsetByteProgress> tracker, + OutputReceiver<SequencedMessage> receiver) + throws ApiException { + checkSubscription(subscriptionPartition); + return new PartitionProcessorImpl( + tracker, + receiver, + consumer -> newSubscriber(subscriptionPartition.partition(), consumer), + options.flowControlSettings()); + } + + private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker( + SubscriptionPartition subscriptionPartition, OffsetRange initial) { + checkSubscription(subscriptionPartition); + return new OffsetByteRangeTracker( + initial, + options.getBacklogReader(subscriptionPartition.partition()), + Stopwatch.createUnstarted(), + MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4), + LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); + } + + private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) { + checkSubscription(subscriptionPartition); + return options.getInitialOffsetReader(subscriptionPartition.partition()); + } + + private Committer newCommitter(SubscriptionPartition subscriptionPartition) { + checkSubscription(subscriptionPartition); + return options.getCommitter(subscriptionPartition.partition()); + } + + @Override + public PCollection<SequencedMessage> expand(PBegin input) { + PCollection<SubscriptionPartition> partitions = + Create.of( + options.partitions().stream() + .map( + partition -> + SubscriptionPartition.of(options.subscriptionPath(), partition)) + .collect(Collectors.toList())) + .expand(input); + // Prevent fusion between Create and read. + PCollection<SubscriptionPartition> shuffledPartitions = Review comment: I don'y think inserting a Reshuffle is necessary. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.joda.time.Duration; +import org.joda.time.Instant; + +class PerPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> { Review comment: `PerSubscriptionPartitionSdf`? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.Subscriber; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; +import org.joda.time.Duration; + +class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> { Review comment: Why not put the whole logic into `PubSubLiteIO`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
