TheNeuralBit commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r831358967



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,111 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
+  // getReadySubscriber doesn't reference the subscriber member.
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
-  }
-
-  @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
-      try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
-      }
-    } else {
-      completionFuture.set(null);
-    }
+    this.subscriber = getReadySubscriber(subscriberFactory);

Review comment:
       The suppression doesn't seem to work in PreCommit:
   
   ```
   08:31:50 
/home/jenkins/jenkins-slave/workspace/beam_PreCommit_SQL_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java:58:
 error: [method.invocation.invalid] call to 
getReadySubscriber(java.util.function.Supplier<org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber>)
 not allowed on the given receiver.
   08:31:50     this.subscriber = getReadySubscriber(subscriberFactory);
   08:31:50                                         ^
   08:31:50   found   : @UnderInitialization @NonNull 
SubscriptionPartitionProcessorImpl
   08:31:50   required: @Initialized @NonNull SubscriptionPartitionProcessorImpl
   ```
   
   You might repro locally by running with checker framework enabled: 
https://cwiki.apache.org/confluence/display/BEAM/Java+Tips#JavaTips-WhatcommandshouldIrunlocallybeforecreatingapullrequest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to