[ 
https://issues.apache.org/jira/browse/BEAM-5791?focusedWorklogId=164821&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164821
 ]

ASF GitHub Bot logged work on BEAM-5791:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Nov/18 06:43
            Start Date: 12/Nov/18 06:43
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #6940: [BEAM-5791] 
Implement time-based pushback in the dataflow harness data plane.
URL: https://github.com/apache/beam/pull/6940
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index 96dfcb6d506..bc0fb54d952 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -38,6 +38,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -269,6 +270,7 @@ public DynamicSplitResult 
requestDynamicSplit(DynamicSplitRequest splitRequest)
     private final AtomicReference<Progress> latestProgress = new 
AtomicReference<>();
     private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
     private ScheduledFuture<?> nextProgressFuture;
+    private final Consumer<Integer> grpcWriteOperationElementsProcessed;
 
     private final Map<MetricKey, MetricUpdates.MetricUpdate<Long>> 
counterUpdates;
     private final Map<MetricKey, MetricUpdates.MetricUpdate<DistributionData>> 
distributionUpdates;
@@ -281,6 +283,7 @@ public SingularProcessBundleProgressTracker(
       this.readOperation = readOperation;
       this.grpcWriteOperation = grpcWriteOperation;
       this.bundleProcessOperation = bundleProcessOperation;
+      this.grpcWriteOperationElementsProcessed = 
grpcWriteOperation.processedElementsConsumer();
       this.progressInterpolator =
           new Interpolator<Progress>(MAX_DATA_POINTS) {
             @Override
@@ -300,31 +303,39 @@ private void periodicProgressUpdate() {
     @VisibleForTesting
     void updateProgress() {
       try {
+        if (bundleProcessOperation.hasFailed()) {
+          grpcWriteOperation.abortWait();
+        }
+
         BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
 
         updateMetrics(metrics);
 
         double elementsConsumed = 
bundleProcessOperation.getInputElementsConsumed(metrics);
 
+        grpcWriteOperationElementsProcessed.accept((int) elementsConsumed);
         progressInterpolator.addPoint(
             grpcWriteOperation.getElementsSent(), readOperation.getProgress());
         
latestProgress.set(progressInterpolator.interpolateAndPurge(elementsConsumed));
         progressErrors = 0;
       } catch (Exception exn) {
-        progressErrors++;
-        // Only log verbosely every power of two to avoid spamming the logs.
-        if (Integer.bitCount(progressErrors) == 1) {
-          LOG.warn(
-              String.format(
-                  "Progress updating failed %s times. Following exception 
safely handled.",
-                  progressErrors),
-              exn);
-        } else {
-          LOG.debug(
-              String.format(
-                  "Progress updating failed %s times. Following exception 
safely handled.",
-                  progressErrors),
-              exn);
+        if (!isTransientProgressError(exn.getMessage())) {
+          grpcWriteOperationElementsProcessed.accept(-1); // Not supported.
+          progressErrors++;
+          // Only log verbosely every power of two to avoid spamming the logs.
+          if (Integer.bitCount(progressErrors) == 1) {
+            LOG.warn(
+                String.format(
+                    "Progress updating failed %s times. Following exception 
safely handled.",
+                    progressErrors),
+                exn);
+          } else {
+            LOG.debug(
+                String.format(
+                    "Progress updating failed %s times. Following exception 
safely handled.",
+                    progressErrors),
+                exn);
+          }
         }
 
         try {
@@ -589,4 +600,12 @@ private ProgressTracker createProgressTracker() {
       return new NullProgressTracker();
     }
   }
+
+  /** Whether the given error is likely to go away (e.g. the bundle has not 
started). */
+  private static boolean isTransientProgressError(String msg) {
+    return msg != null
+        && (msg.contains("Process bundle request not yet scheduled")
+            || msg.contains("Unknown process bundle instruction")
+            || msg.contains("unstarted operation"));
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index 783408e06ca..e411a856710 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -336,6 +336,15 @@ public void abort() throws Exception {
         .thenApply(response -> response.getMetrics());
   }
 
+  public boolean hasFailed() throws ExecutionException, InterruptedException {
+    if (processBundleResponse != null && 
processBundleResponse.toCompletableFuture().isDone()) {
+      return 
!processBundleResponse.toCompletableFuture().get().getError().isEmpty();
+    } else {
+      // At the very least, we don't know that this has failed yet.
+      return false;
+    }
+  }
+
   /** Returns the number of input elements consumed by the gRPC read, if 
known, otherwise 0. */
   double getInputElementsConsumed(BeamFnApi.Metrics metrics) {
     return metrics
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index db30bc02363..9cf3dd4ad12 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -17,9 +17,14 @@
  */
 package org.apache.beam.runners.dataflow.worker.fn.data;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
@@ -31,6 +36,8 @@
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link Operation} that uses the Beam Fn Data API to send messages.
@@ -38,6 +45,8 @@
  * <p>This {@link Operation} supports restart.
  */
 public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteGrpcPortWriteOperation.class);
+
   private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new 
OutputReceiver[0];
   private final Coder<WindowedValue<T>> coder;
   private final FnDataService beamFnDataService;
@@ -49,33 +58,142 @@
   private CloseableFnDataReceiver<WindowedValue<T>> receiver;
   private final AtomicInteger elementsSent = new AtomicInteger();
 
+  private boolean usingElementsProcessed = false;
+  private AtomicInteger elementsProcessed = new AtomicInteger();
+  private int elementsFlushed;
+  private int targetElementsSent;
+
+  private final Supplier<Long> currentTimeMillis;
+  private long firstElementSentMillis;
+  private long secondElementSentMillis;
+
+  @VisibleForTesting static final long MAX_BUFFER_MILLIS = 5000;
+
   public RemoteGrpcPortWriteOperation(
       FnDataService beamFnDataService,
       Target target,
       Supplier<String> bundleIdSupplier,
       Coder<WindowedValue<T>> coder,
       OperationContext context) {
+    this(beamFnDataService, target, bundleIdSupplier, coder, context, 
System::currentTimeMillis);
+  }
+
+  public RemoteGrpcPortWriteOperation(
+      FnDataService beamFnDataService,
+      Target target,
+      Supplier<String> bundleIdSupplier,
+      Coder<WindowedValue<T>> coder,
+      OperationContext context,
+      Supplier<Long> currentTimeMillis) {
     super(EMPTY_RECEIVER_ARRAY, context);
     this.coder = coder;
     this.beamFnDataService = beamFnDataService;
     this.bundleIdSupplier = bundleIdSupplier;
     this.target = target;
+    this.currentTimeMillis = currentTimeMillis;
   }
 
   @Override
   public void start() throws Exception {
     try (Closeable scope = context.enterStart()) {
       elementsSent.set(0);
+      elementsProcessed.set(0);
+      targetElementsSent = 1;
+      elementsFlushed = 0;
       super.start();
       bundleId = bundleIdSupplier.get();
       receiver = beamFnDataService.send(LogicalEndpoint.of(bundleId, target), 
coder);
     }
   }
 
+  /** Attempt to bound the amount of unconsumed data written to the buffer in 
absolute time. */
+  @VisibleForTesting
+  boolean shouldWait() throws Exception {
+    if (!usingElementsProcessed) {
+      return false;
+    }
+    int numSent = elementsSent.get();
+    if (numSent >= targetElementsSent) {
+      if (elementsFlushed < numSent) {
+        receiver.flush();
+        elementsFlushed = numSent;
+      }
+      int numProcessed = elementsProcessed.get();
+      // A negative value indicates that obtaining numProcessed is not 
supported.
+      // Otherwise, wait until the SDK has processed at least one element 
before continuing.
+      if (numProcessed < 0) {
+        targetElementsSent = Integer.MAX_VALUE;
+      } else if (numProcessed == 0) {
+        targetElementsSent = 1;
+      } else {
+        double rate;
+        if (numProcessed == 1) {
+          rate = (double) numProcessed / (currentTimeMillis.get() - 
firstElementSentMillis);
+        } else {
+          rate = ((double) numProcessed - 1) / (currentTimeMillis.get() - 
secondElementSentMillis);
+        }
+        // Note that numProcessed is always increasing up to numSent, and rate 
is always positive,
+        // so eventually we'll return True.
+        targetElementsSent =
+            (int) Math.min(numProcessed + rate * MAX_BUFFER_MILLIS + 1, 
Integer.MAX_VALUE);
+      }
+    }
+    return numSent >= targetElementsSent;
+  }
+
+  public Consumer<Integer> processedElementsConsumer() {
+    usingElementsProcessed = true;
+    return elementsProcessed -> {
+      try {
+        lock.lock();
+        this.elementsProcessed.set(elementsProcessed);
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    };
+  }
+
+  Lock lock = new ReentrantLock();
+  Condition condition = lock.newCondition();
+
+  private void maybeWait() throws Exception {
+    if (shouldWait()) {
+      try {
+        lock.lock();
+        while (shouldWait()) {
+          LOG.debug(
+              "Throttling elements at {} until more than {} elements been 
processed.",
+              elementsSent.get(),
+              elementsProcessed.get());
+          condition.await();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void abortWait() {
+    usingElementsProcessed = false;
+    try {
+      lock.lock();
+      condition.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
+
   @Override
   public void process(Object outputElem) throws Exception {
     try (Closeable scope = context.enterProcess()) {
-      elementsSent.incrementAndGet();
+      maybeWait();
+      int numSent = elementsSent.incrementAndGet();
+      if (numSent == 1) {
+        firstElementSentMillis = currentTimeMillis.get();
+      } else if (numSent == 2) {
+        secondElementSentMillis = currentTimeMillis.get();
+      }
       receiver.accept((WindowedValue<T>) outputElem);
     }
   }
@@ -92,6 +210,7 @@ public void finish() throws Exception {
   @Override
   public void abort() throws Exception {
     try (Closeable scope = context.enterAbort()) {
+      abortWait();
       receiver.close();
       super.abort();
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
index 02058a03a10..eb44eaa9326 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
@@ -137,6 +137,9 @@ public void accept(WindowedValue<T> windowedValue) throws 
Exception {
           inboundDataClient.complete();
         }
 
+        @Override
+        public void flush() throws Exception {}
+
         @Override
         public void close() throws Exception {}
       };
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index a6a6003189d..9bcb2242b2b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -271,6 +271,8 @@ public void testTentativeUserMetricsOverwrite() throws 
Exception {
           public void close() {}
         };
 
+    
when(grpcPortWriteOperation.processedElementsConsumer()).thenReturn(elementsConsumed
 -> {});
+
     RegisterAndProcessBundleOperation processOperation =
         new RegisterAndProcessBundleOperation(
             idGenerator,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
index f7c67abdacb..5179268ac31 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
@@ -68,6 +68,8 @@ public void testProgressInterpolation() throws Exception {
     RegisterAndProcessBundleOperation process =
         Mockito.mock(RegisterAndProcessBundleOperation.class);
 
+    when(grpcWrite.processedElementsConsumer()).thenReturn(elementsConsumed -> 
{});
+
     SingularProcessBundleProgressTracker tracker =
         new SingularProcessBundleProgressTracker(read, grpcWrite, process);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
index f9e8bcc1d6d..72741321078 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
@@ -29,6 +29,8 @@
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
@@ -130,6 +132,82 @@ public void testStartAndAbort() throws Exception {
     verifyNoMoreInteractions(beamFnDataService);
   }
 
+  @Test
+  public void testBufferRateLimiting() throws Exception {
+    AtomicInteger processedElements = new AtomicInteger();
+    AtomicInteger currentTimeMillis = new AtomicInteger(10000);
+
+    final int START_BUFFER_SIZE = 3;
+    final int STEADY_BUFFER_SIZE = 10;
+    final int FIRST_ELEMENT_DURATION =
+        (int) RemoteGrpcPortWriteOperation.MAX_BUFFER_MILLIS / 
START_BUFFER_SIZE + 1;
+    final int STEADY_ELEMENT_DURATION =
+        (int) RemoteGrpcPortWriteOperation.MAX_BUFFER_MILLIS / 
STEADY_BUFFER_SIZE + 1;
+
+    operation =
+        new RemoteGrpcPortWriteOperation<>(
+            beamFnDataService,
+            TARGET,
+            bundleIdSupplier,
+            CODER,
+            operationContext,
+            () -> (long) currentTimeMillis.get());
+
+    RecordingConsumer<WindowedValue<String>> recordingConsumer = new 
RecordingConsumer<>();
+    when(beamFnDataService.send(any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
+        .thenReturn(recordingConsumer);
+    when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+
+    Consumer<Integer> processedElementConsumer = 
operation.processedElementsConsumer();
+
+    operation.start();
+
+    // Never wait before sending the first element.
+    assertFalse(operation.shouldWait());
+    operation.process(valueInGlobalWindow("first"));
+
+    // After sending the first element, wait until it's processed before 
sending another.
+    assertTrue(operation.shouldWait());
+
+    // Once we've processed the element, we can send the second.
+    currentTimeMillis.getAndAdd(FIRST_ELEMENT_DURATION);
+    processedElementConsumer.accept(1);
+    assertFalse(operation.shouldWait());
+    operation.process(valueInGlobalWindow("second"));
+
+    // Send elements until the buffer is full.
+    for (int i = 2; i < START_BUFFER_SIZE + 1; i++) {
+      assertFalse(operation.shouldWait());
+      operation.process(valueInGlobalWindow("element" + i));
+    }
+
+    // The buffer is full.
+    assertTrue(operation.shouldWait());
+
+    // Now finish processing the second element.
+    currentTimeMillis.getAndAdd(STEADY_ELEMENT_DURATION);
+    processedElementConsumer.accept(2);
+
+    // That was faster, so our buffer quota is larger.
+    for (int i = START_BUFFER_SIZE + 1; i < STEADY_BUFFER_SIZE + 2; i++) {
+      assertFalse(operation.shouldWait());
+      operation.process(valueInGlobalWindow("element" + i));
+    }
+
+    // The buffer is full again.
+    assertTrue(operation.shouldWait());
+
+    // As elements are consumed, we can keep adding more.
+    for (int i = START_BUFFER_SIZE + STEADY_BUFFER_SIZE + 2; i < 100; i++) {
+      currentTimeMillis.getAndAdd(STEADY_ELEMENT_DURATION);
+      processedElementConsumer.accept(i);
+      assertFalse(operation.shouldWait());
+      operation.process(valueInGlobalWindow("element" + i));
+    }
+
+    operation.finish();
+  }
+
   private static class RecordingConsumer<T> extends ArrayList<T>
       implements CloseableFnDataReceiver<T> {
     private boolean closed;
@@ -140,7 +218,10 @@ public void close() throws Exception {
     }
 
     @Override
-    public void accept(T t) throws Exception {
+    public void flush() throws Exception {}
+
+    @Override
+    public synchronized void accept(T t) throws Exception {
       if (closed) {
         throw new IllegalStateException("Consumer is closed but attempting to 
consume " + t);
       }
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index a2306e3da4c..104ed21a689 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -112,6 +112,13 @@ public void close() throws Exception {
     outboundObserver.onNext(elements.build());
   }
 
+  @Override
+  public void flush() throws IOException {
+    if (bufferedElements.size() > 0) {
+      outboundObserver.onNext(convertBufferForTransmission().build());
+    }
+  }
+
   @Override
   public void accept(WindowedValue<T> t) throws IOException {
     if (closed) {
@@ -120,7 +127,7 @@ public void accept(WindowedValue<T> t) throws IOException {
     coder.encode(t, bufferedElements);
     counter += 1;
     if (bufferedElements.size() >= bufferLimit) {
-      outboundObserver.onNext(convertBufferForTransmission().build());
+      flush();
     }
   }
 
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
index 1e7776f7717..076e9358d6f 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
@@ -23,6 +23,17 @@
  * <p>The close method for a {@link CloseableFnDataReceiver} must be 
idempotent.
  */
 public interface CloseableFnDataReceiver<T> extends FnDataReceiver<T>, 
AutoCloseable {
+
+  /**
+   * Eagerly flushes any data that is buffered in this channel.
+   *
+   * @deprecated to be removed once splitting/checkpointing are available in 
SDKs and rewinding in
+   *     readers.
+   * @throws Exception
+   */
+  @Deprecated
+  void flush() throws Exception;
+
   /**
    * {@inheritDoc}.
    *
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 6fc349fa4fe..89ae55d0c22 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -156,6 +156,11 @@ public void close() throws Exception {
           public void accept(WindowedValue<String> t) throws Exception {
             outputValues.add(t);
           }
+
+          @Override
+          public void flush() throws Exception {
+            throw new UnsupportedOperationException("Flush is not supported");
+          }
         };
 
     when(mockBeamFnDataClient.send(any(), any(), 
Matchers.<Coder<WindowedValue<String>>>any()))
@@ -257,6 +262,11 @@ public void accept(T t) throws Exception {
       }
       add(t);
     }
+
+    @Override
+    public void flush() throws Exception {
+      throw new UnsupportedOperationException("Flush is not supported");
+    }
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 164821)
    Time Spent: 3h 20m  (was: 3h 10m)

> Bound the amount of data on the data plane by time.
> ---------------------------------------------------
>
>                 Key: BEAM-5791
>                 URL: https://issues.apache.org/jira/browse/BEAM-5791
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow, sdk-java-harness, sdk-py-harness
>            Reporter: Robert Bradshaw
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> This is especially important for Fn API reads, where each element represents 
> a shard to read and may be very expensive, but many elements may be waiting 
> in the Fn API buffer.
> The need for this will be mitigated with full SDF support for liquid sharding 
> over the Fn API, but not eliminated unless the runner can "unread" elements 
> it has already sent. 
> This is especially important in for dataflow jobs that start out small but 
> then detect that they need more workers (e.g. due to the initial inputs being 
> an SDF).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to