dpcollins-google commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r831165362



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")

Review comment:
       No but added a comment with rationale of why its safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to