dpcollins-google commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r831167551



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never 
return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, 
TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to