scwhittle commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r830897695



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ProxyService;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBufferedSubscriberImpl extends ProxyService implements 
MemoryBufferedSubscriber {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryBufferedSubscriberImpl.class);
+
+  private final Partition partition;
+  private final MemoryLimiter limiter;
+  private final Subscriber subscriber;
+  private final long maxMemory;
+  private long targetMemory;
+  private Offset fetchOffset;
+  private Block memBlock;
+
+  private long bytesOutstandingToServer = 0;
+  private long bytesOutstanding = 0;
+  private final Queue<SequencedMessage> messages = new ArrayDeque<>();
+  private SettableApiFuture<Void> newData = SettableApiFuture.create();
+  private boolean shutdown = false;
+
+  @SuppressWarnings({"methodref.receiver.bound.invalid", 
"method.invocation.invalid"})

Review comment:
       why are these safe to ignore? Add a comment

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
##########
@@ -76,97 +57,47 @@ public IsBounded isBounded() {
 
   @Override
   public boolean tryClaim(OffsetByteProgress position) {
-    long toClaim = position.lastOffset().value();
-    checkArgument(
-        lastClaimed == null || toClaim > lastClaimed,
-        "Trying to claim offset %s while last attempted was %s",
-        position.lastOffset().value(),
-        lastClaimed);
-    checkArgument(
-        toClaim >= range.getRange().getFrom(),
-        "Trying to claim offset %s before start of the range %s",
-        toClaim,
-        range);
-    // split() has already been called, truncating this range. No more offsets 
may be claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
-      boolean isRangeEmpty = range.getRange().getTo() == 
range.getRange().getFrom();
-      boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
-      checkState(
-          isRangeEmpty || isValidClosedRange,
-          "Violated class precondition: offset range improperly split. Please 
report a beam bug.");
-      return false;
-    }
-    lastClaimed = toClaim;
-    range = OffsetByteRange.of(range.getRange(), range.getByteCount() + 
position.batchBytes());
+    if (!rangeTracker.tryClaim(position.lastOffset().value())) return false;
+    lastClaimed = position.lastOffset().value();
+    bytes += position.batchBytes();
     return true;
   }
 
   @Override
   public OffsetByteRange currentRestriction() {
-    return range;
+    return OffsetByteRange.of(rangeTracker.currentRestriction(), bytes);
   }
 
   private long nextOffset() {
     checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
     return lastClaimed == null ? currentRestriction().getRange().getFrom() : 
lastClaimed + 1;
   }
 
-  /**
-   * Whether the tracker has received enough data/been running for enough time 
that it can
-   * checkpoint and be confident it can get sufficient throughput.
-   */
-  private boolean receivedEnough() {
-    Duration duration = 
Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-    if (duration.isLongerThan(minTrackingTime)) {
-      return true;
-    }
-    if (currentRestriction().getByteCount() >= minBytesReceived) {
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public @Nullable SplitResult<OffsetByteRange> trySplit(double 
fractionOfRemainder) {
     // Cannot split a bounded range. This should already be completely claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
+    if (rangeTracker.currentRestriction().getTo() != Long.MAX_VALUE) {
       return null;
     }
-    if (!receivedEnough()) {
+    @Nullable SplitResult<OffsetRange> ranges = 
rangeTracker.trySplit(fractionOfRemainder);
+    if (ranges == null) {
       return null;
     }
-    range =
-        OffsetByteRange.of(
-            new OffsetRange(currentRestriction().getRange().getFrom(), 
nextOffset()),
-            range.getByteCount());
+    
checkArgument(rangeTracker.currentRestriction().equals(ranges.getPrimary()));
     return SplitResult.of(
-        this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), 
Long.MAX_VALUE), 0));
+        currentRestriction(), 
OffsetByteRange.of(checkArgumentNotNull(ranges.getResidual())));
   }
 
   @Override
   @SuppressWarnings("unboxing.of.nullable")

Review comment:
       remove suppresion

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryLimiterImpl implements MemoryLimiter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryLimiterImpl.class);
+  private final long minBlockSize;
+
+  @GuardedBy("this")
+  private long available;
+
+  public MemoryLimiterImpl(long minBlockSize, long maxAvailable) {
+    this.minBlockSize = minBlockSize;
+    this.available = maxAvailable;
+  }
+
+  @Override
+  public synchronized Block claim(long toAcquire) {
+    toAcquire = Math.max(Math.min(toAcquire, available / 2), minBlockSize);
+    available -= toAcquire;
+    return new Block(toAcquire);
+  }
+
+  @Override
+  public long getMinBlockSize() {
+    return minBlockSize;
+  }
+
+  private synchronized void release(long toRelease) {
+    available += toRelease;

Review comment:
       assert the available is less than the max after adding toRelease?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
##########
@@ -61,8 +64,8 @@ public void teardown() {
   }
 
   @GetInitialWatermarkEstimatorState
-  public Instant getInitialWatermarkState() {
-    return Instant.EPOCH;
+  public Instant getInitialWatermarkState(@Timestamp Instant inputTs) {

Review comment:
       add a comment on why this is necessary since this was tricky to get right

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
##########
@@ -38,12 +39,16 @@
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
 import org.joda.time.Duration;
 
 public class SubscribeTransform extends PTransform<PBegin, 
PCollection<SequencedMessage>> {
   private static final long MEBIBYTE = 1L << 20;
+  private static final long SOFT_MEMORY_LIMIT = 512 * MEBIBYTE;

Review comment:
       could consider making these pipeline options

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryLimiterImpl implements MemoryLimiter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryLimiterImpl.class);
+  private final long minBlockSize;
+
+  @GuardedBy("this")
+  private long available;
+
+  public MemoryLimiterImpl(long minBlockSize, long maxAvailable) {
+    this.minBlockSize = minBlockSize;
+    this.available = maxAvailable;
+  }
+
+  @Override
+  public synchronized Block claim(long toAcquire) {
+    toAcquire = Math.max(Math.min(toAcquire, available / 2), minBlockSize);
+    available -= toAcquire;
+    return new Block(toAcquire);
+  }
+
+  @Override
+  public long getMinBlockSize() {
+    return minBlockSize;
+  }
+
+  private synchronized void release(long toRelease) {
+    available += toRelease;
+  }
+
+  public class Block implements MemoryLimiter.Block {
+    public final long claimed;
+    private boolean released = false;
+
+    private Block(long claimed) {
+      this.claimed = claimed;
+    }
+
+    @Override
+    public long claimed() {
+      return claimed;
+    }
+
+    @Override
+    public void close() {
+      release(claimed);

Review comment:
       assert not released already?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never 
return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, 
TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.

Review comment:
       would it be better to rearrange loop so that you try to peek before 
awaiting on onData?  Seems better to fall back to waiting on notification only 
if necessary because no data is present. 
   

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")

Review comment:
       can this be removed?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never 
return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, 
TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.
+      for (Optional<SequencedMessage> next = subscriber.peek();
+          next.isPresent();
+          next = subscriber.peek()) {
+        SequencedMessage message = next.get();
+        System.err.println("Next: " + message);
+        Offset messageOffset = Offset.of(message.getCursor().getOffset());
+        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, 
message.getSizeBytes()))) {
+          subscriber.pop();
+          lastClaimedOffset = Optional.of(messageOffset);
+          receiver.outputWithTimestamp(
+              message, new 
Instant(Timestamps.toMillis(message.getPublishTime())));
+        } else {
+          // Our claim failed, return stop()
+          return ProcessContinuation.stop();
         }
       }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-    // Determine return code after shutdown.
-    if (completionFuture.isDone()) {
-      // Call get() to ensure there is no exception.
-      try {
-        completionFuture.get();
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
-      }
-      // CompletionFuture set with null when tryClaim returned false.
-      return ProcessContinuation.stop();
     }
+    // We were interrupted,
     return ProcessContinuation.resume();
   }
 
-  @Override
-  public void close() {
-    try {
-      blockingShutdown(subscriber);
-    } catch (Throwable t) {
-      // Don't propagate errors on subscriber shutdown.
-      LOG.info("Error on subscriber shutdown.", t);
-    }
-  }
-
   @Override
   public Optional<Offset> lastClaimed() {
     return lastClaimedOffset;
   }
+
+  private MemoryBufferedSubscriber getReadySubscriber(
+      Supplier<MemoryBufferedSubscriber> getOrCreate) {
+    Offset startOffset = 
Offset.of(tracker.currentRestriction().getRange().getFrom());
+    MemoryBufferedSubscriber subscriber = getOrCreate.get();
+    Offset fetchOffset = subscriber.fetchOffset();
+    while (!startOffset.equals(fetchOffset)) {
+      LOG.info(
+          "Discarding subscriber due to mismatch, this should be rare. {}, 
start: {} fetch: {}",
+          subscriptionPartition,
+          startOffset,
+          fetchOffset);
+      try {
+        subscriber.stopAsync().awaitTerminated();
+      } catch (Exception ignored) {
+      }
+      subscriber = getOrCreate.get();
+      fetchOffset = subscriber.fetchOffset();

Review comment:
       nit: I think it would be less error prone to assign 
subscriber/fetchOffset in only one spot
   while (true) {
      MemoryBufferedSubscriber subscriber = getOrCreate.get();
      Offset fetchOffset = ...
      if (startOffset.equals(fetchOffset)) {
          subscriber.rebuffer(); 
          return subscriber;
      }
   
      LOG, recreate stop subscriber
   }
   

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
##########
@@ -61,8 +64,8 @@ public void teardown() {
   }
 
   @GetInitialWatermarkEstimatorState
-  public Instant getInitialWatermarkState() {
-    return Instant.EPOCH;
+  public Instant getInitialWatermarkState(@Timestamp Instant inputTs) {

Review comment:
       can this be unit tested also? Or is that tricky because this depended on 
Watch watermark interaction?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiService;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.Optional;
+
+interface MemoryBufferedSubscriber extends ApiService {
+
+  /**
+   * Get the current fetch offset of this subscriber. This offset will be less 
than or equal to all
+   * future messages returned by this object.
+   */
+  Offset fetchOffset();
+
+  /**
+   * Notify this subscriber that all previously in-memory messages are now no 
longer taking up space
+   * in this JVM.

Review comment:
       This doesn't seem to be true and seems to indicate how this should be 
used but there seem to be other valid uses.
   
   How about saying that previously popped messages are will not limit the 
memory budget

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never 
return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, 
TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.
+      for (Optional<SequencedMessage> next = subscriber.peek();
+          next.isPresent();
+          next = subscriber.peek()) {
+        SequencedMessage message = next.get();
+        System.err.println("Next: " + message);
+        Offset messageOffset = Offset.of(message.getCursor().getOffset());
+        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, 
message.getSizeBytes()))) {
+          subscriber.pop();
+          lastClaimedOffset = Optional.of(messageOffset);
+          receiver.outputWithTimestamp(
+              message, new 
Instant(Timestamps.toMillis(message.getPublishTime())));
+        } else {
+          // Our claim failed, return stop()
+          return ProcessContinuation.stop();
         }
       }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-    // Determine return code after shutdown.
-    if (completionFuture.isDone()) {
-      // Call get() to ensure there is no exception.
-      try {
-        completionFuture.get();
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
-      }
-      // CompletionFuture set with null when tryClaim returned false.
-      return ProcessContinuation.stop();
     }
+    // We were interrupted,

Review comment:
       This made me think of interrupted exception, but it this seems to be due 
to subscriber no longer running. Reword for clarity




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to