[
https://issues.apache.org/jira/browse/BEAM-5791?focusedWorklogId=157531&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157531
]
ASF GitHub Bot logged work on BEAM-5791:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Oct/18 12:49
Start Date: 23/Oct/18 12:49
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #6752: [BEAM-5791]
Implement time-based pushback in the dataflow harness data plane.
URL: https://github.com/apache/beam/pull/6752
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index c585d4febac..59134eac879 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -39,6 +39,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -270,6 +271,7 @@ public DynamicSplitResult
requestDynamicSplit(DynamicSplitRequest splitRequest)
private final AtomicReference<Progress> latestProgress = new
AtomicReference<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> nextProgressFuture;
+ private final Consumer<Integer> grpcWriteOperationElementsProcessed;
private final Map<MetricKey, MetricUpdates.MetricUpdate<Long>>
counterUpdates;
private final Map<MetricKey, MetricUpdates.MetricUpdate<DistributionData>>
distributionUpdates;
@@ -282,6 +284,7 @@ public SingularProcessBundleProgressTracker(
this.readOperation = readOperation;
this.grpcWriteOperation = grpcWriteOperation;
this.bundleProcessOperation = bundleProcessOperation;
+ this.grpcWriteOperationElementsProcessed =
grpcWriteOperation.processedElementsConsumer();
this.progressInterpolator =
new Interpolator<Progress>(MAX_DATA_POINTS) {
@Override
@@ -307,25 +310,29 @@ void updateProgress() {
double elementsConsumed =
bundleProcessOperation.getInputElementsConsumed(metrics);
+ grpcWriteOperationElementsProcessed.accept((int) elementsConsumed);
progressInterpolator.addPoint(
grpcWriteOperation.getElementsSent(), readOperation.getProgress());
latestProgress.set(progressInterpolator.interpolateAndPurge(elementsConsumed));
progressErrors = 0;
} catch (Exception exn) {
- progressErrors++;
- // Only log verbosely every power of two to avoid spamming the logs.
- if (Integer.bitCount(progressErrors) == 1) {
- LOG.warn(
- String.format(
- "Progress updating failed %s times. Following exception
safely handled.",
- progressErrors),
- exn);
- } else {
- LOG.debug(
- String.format(
- "Progress updating failed %s times. Following exception
safely handled.",
- progressErrors),
- exn);
+ if (!isTransientProgressError(exn.getMessage())) {
+ grpcWriteOperationElementsProcessed.accept(-1); // Not supported.
+ progressErrors++;
+ // Only log verbosely every power of two to avoid spamming the logs.
+ if (Integer.bitCount(progressErrors) == 1) {
+ LOG.warn(
+ String.format(
+ "Progress updating failed %s times. Following exception
safely handled.",
+ progressErrors),
+ exn);
+ } else {
+ LOG.debug(
+ String.format(
+ "Progress updating failed %s times. Following exception
safely handled.",
+ progressErrors),
+ exn);
+ }
}
try {
@@ -590,4 +597,12 @@ private ProgressTracker createProgressTracker() {
return new NullProgressTracker();
}
}
+
+ /** Whether the given error is likely to go away (e.g. the bundle has not
started). */
+ private static boolean isTransientProgressError(String msg) {
+ return msg != null
+ && (msg.contains("Process bundle request not yet scheduled")
+ || msg.contains("Unknown process bundle instruction")
+ || msg.contains("unstarted operation"));
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index c13564215e6..1f74cf7e493 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -18,9 +18,14 @@
package org.apache.beam.runners.dataflow.worker.fn.data;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
@@ -32,6 +37,8 @@
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link Operation} that uses the Beam Fn Data API to send messages.
@@ -39,6 +46,8 @@
* <p>This {@link Operation} supports restart.
*/
public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteGrpcPortWriteOperation.class);
+
private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new
OutputReceiver[0];
private final Coder<WindowedValue<T>> coder;
private final FnDataService beamFnDataService;
@@ -50,33 +59,133 @@
private CloseableFnDataReceiver<WindowedValue<T>> receiver;
private final AtomicInteger elementsSent = new AtomicInteger();
+ boolean usingElementsProcessed = false;
+ AtomicInteger elementsProcessed = new AtomicInteger();
+ int elementsFlushed;
+ int targetElementsSent;
+
+ private final Supplier<Long> currentTimeMillis;
+ private long firstElementSentMillis;
+ private long secondElementSentMillis;
+
+ @VisibleForTesting static final long MAX_BUFFER_MILLIS = 5000;
+
public RemoteGrpcPortWriteOperation(
FnDataService beamFnDataService,
Target target,
Supplier<String> bundleIdSupplier,
Coder<WindowedValue<T>> coder,
OperationContext context) {
+ this(beamFnDataService, target, bundleIdSupplier, coder, context,
System::currentTimeMillis);
+ }
+
+ public RemoteGrpcPortWriteOperation(
+ FnDataService beamFnDataService,
+ Target target,
+ Supplier<String> bundleIdSupplier,
+ Coder<WindowedValue<T>> coder,
+ OperationContext context,
+ Supplier<Long> currentTimeMillis) {
super(EMPTY_RECEIVER_ARRAY, context);
this.coder = coder;
this.beamFnDataService = beamFnDataService;
this.bundleIdSupplier = bundleIdSupplier;
this.target = target;
+ this.currentTimeMillis = currentTimeMillis;
}
@Override
public void start() throws Exception {
try (Closeable scope = context.enterStart()) {
elementsSent.set(0);
+ elementsProcessed.set(0);
+ targetElementsSent = 1;
+ elementsFlushed = 0;
super.start();
bundleId = bundleIdSupplier.get();
receiver = beamFnDataService.send(LogicalEndpoint.of(bundleId, target),
coder);
}
}
+ /** Attempt to bound the amount of unconsumed data written to the buffer in
absolute time. */
+ @VisibleForTesting
+ boolean shouldWait() throws Exception {
+ if (!usingElementsProcessed) {
+ return false;
+ }
+ int numSent = elementsSent.get();
+ if (numSent >= targetElementsSent) {
+ if (elementsFlushed < numSent) {
+ receiver.flush();
+ elementsFlushed = numSent;
+ }
+ int numProcessed = elementsProcessed.get();
+ // A negative value indicates that obtaining numProcessed is not
supported.
+ if (numProcessed < 0) {
+ targetElementsSent = Integer.MAX_VALUE;
+ }
+ // Wait until the SDK has processed at least one element before
continuing.
+ if (numProcessed == 0) {
+ targetElementsSent = 1;
+ } else {
+ double rate;
+ if (numProcessed == 1) {
+ rate = (double) numProcessed / (currentTimeMillis.get() -
firstElementSentMillis);
+ } else {
+ rate = ((double) numProcessed - 1) / (currentTimeMillis.get() -
secondElementSentMillis);
+ }
+ // Note that numProcessed is always increasing up to numSent, and rate
is always positive,
+ // so eventually we'll return True.
+ targetElementsSent =
+ (int) Math.min(numProcessed + rate * MAX_BUFFER_MILLIS + 1,
Integer.MAX_VALUE);
+ }
+ }
+ return numSent >= targetElementsSent;
+ }
+
+ public Consumer<Integer> processedElementsConsumer() {
+ usingElementsProcessed = true;
+ return elementsProcessed -> {
+ try {
+ lock.lock();
+ this.elementsProcessed.set(elementsProcessed);
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+ };
+ }
+
+ Lock lock = new ReentrantLock();
+ Condition condition = lock.newCondition();
+
+ private void maybeWait() throws Exception {
+ if (shouldWait()) {
+ try {
+ lock.lock();
+ while (shouldWait()) {
+ LOG.debug(
+ "Throttling elements at {} until more than {} elements been
processed.",
+ elementsSent.get(),
+ elementsProcessed.get());
+ condition.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
@Override
public void process(Object outputElem) throws Exception {
try (Closeable scope = context.enterProcess()) {
- elementsSent.incrementAndGet();
+ maybeWait();
+ int numSent = elementsSent.incrementAndGet();
+ if (numSent == 1) {
+ firstElementSentMillis = currentTimeMillis.get();
+ } else if (numSent == 2) {
+ secondElementSentMillis = currentTimeMillis.get();
+ }
receiver.accept((WindowedValue<T>) outputElem);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
index 00798f94eb3..d86de634a04 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FnApiWindowMappingFnTest.java
@@ -137,6 +137,9 @@ public void accept(WindowedValue<T> windowedValue) throws
Exception {
inboundDataClient.complete();
}
+ @Override
+ public void flush() throws Exception {}
+
@Override
public void close() throws Exception {}
};
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index ba503a12b16..4542add5fae 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -272,6 +272,8 @@ public void testTentativeUserMetricsOverwrite() throws
Exception {
public void close() {}
};
+
when(grpcPortWriteOperation.processedElementsConsumer()).thenReturn(elementsConsumed
-> {});
+
RegisterAndProcessBundleOperation processOperation =
new RegisterAndProcessBundleOperation(
idGenerator,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
index 7c401de92ad..a2a9cd8f090 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
@@ -69,6 +69,8 @@ public void testProgressInterpolation() throws Exception {
RegisterAndProcessBundleOperation process =
Mockito.mock(RegisterAndProcessBundleOperation.class);
+ when(grpcWrite.processedElementsConsumer()).thenReturn(elementsConsumed ->
{});
+
SingularProcessBundleProgressTracker tracker =
new SingularProcessBundleProgressTracker(read, grpcWrite, process);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
index c359a0de8ab..44969823760 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperationTest.java
@@ -30,6 +30,8 @@
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
@@ -131,6 +133,82 @@ public void testStartAndAbort() throws Exception {
verifyNoMoreInteractions(beamFnDataService);
}
+ @Test
+ public void testBufferRateLimiting() throws Exception {
+ AtomicInteger processedElements = new AtomicInteger();
+ AtomicInteger currentTimeMillis = new AtomicInteger(10000);
+
+ final int START_BUFFER_SIZE = 3;
+ final int STEADY_BUFFER_SIZE = 10;
+ final int FIRST_ELEMENT_DURATION =
+ (int) RemoteGrpcPortWriteOperation.MAX_BUFFER_MILLIS /
START_BUFFER_SIZE + 1;
+ final int STEADY_ELEMENT_DURATION =
+ (int) RemoteGrpcPortWriteOperation.MAX_BUFFER_MILLIS /
STEADY_BUFFER_SIZE + 1;
+
+ operation =
+ new RemoteGrpcPortWriteOperation<>(
+ beamFnDataService,
+ TARGET,
+ bundleIdSupplier,
+ CODER,
+ operationContext,
+ () -> (long) currentTimeMillis.get());
+
+ RecordingConsumer<WindowedValue<String>> recordingConsumer = new
RecordingConsumer<>();
+ when(beamFnDataService.send(any(),
Matchers.<Coder<WindowedValue<String>>>any()))
+ .thenReturn(recordingConsumer);
+ when(bundleIdSupplier.get()).thenReturn(BUNDLE_ID);
+
+ Consumer<Integer> processedElementConsumer =
operation.processedElementsConsumer();
+
+ operation.start();
+
+ // Never wait before sending the first element.
+ assertFalse(operation.shouldWait());
+ operation.process(valueInGlobalWindow("first"));
+
+ // After sending the first element, wait until it's processed before
sending another.
+ assertTrue(operation.shouldWait());
+
+ // Once we've processed the element, we can send the second.
+ currentTimeMillis.getAndAdd(FIRST_ELEMENT_DURATION);
+ processedElementConsumer.accept(1);
+ assertFalse(operation.shouldWait());
+ operation.process(valueInGlobalWindow("second"));
+
+ // Send elements until the buffer is full.
+ for (int i = 2; i < START_BUFFER_SIZE + 1; i++) {
+ assertFalse(operation.shouldWait());
+ operation.process(valueInGlobalWindow("element" + i));
+ }
+
+ // The buffer is full.
+ assertTrue(operation.shouldWait());
+
+ // Now finish processing the second element.
+ currentTimeMillis.getAndAdd(STEADY_ELEMENT_DURATION);
+ processedElementConsumer.accept(2);
+
+ // That was faster, so our buffer quota is larger.
+ for (int i = START_BUFFER_SIZE + 1; i < STEADY_BUFFER_SIZE + 2; i++) {
+ assertFalse(operation.shouldWait());
+ operation.process(valueInGlobalWindow("element" + i));
+ }
+
+ // The buffer is full again.
+ assertTrue(operation.shouldWait());
+
+ // As elements are consumed, we can keep adding more.
+ for (int i = START_BUFFER_SIZE + STEADY_BUFFER_SIZE + 2; i < 100; i++) {
+ currentTimeMillis.getAndAdd(STEADY_ELEMENT_DURATION);
+ processedElementConsumer.accept(i);
+ assertFalse(operation.shouldWait());
+ operation.process(valueInGlobalWindow("element" + i));
+ }
+
+ operation.finish();
+ }
+
private static class RecordingConsumer<T> extends ArrayList<T>
implements CloseableFnDataReceiver<T> {
private boolean closed;
@@ -141,7 +219,10 @@ public void close() throws Exception {
}
@Override
- public void accept(T t) throws Exception {
+ public void flush() throws Exception {}
+
+ @Override
+ public synchronized void accept(T t) throws Exception {
if (closed) {
throw new IllegalStateException("Consumer is closed but attempting to
consume " + t);
}
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
index a2306e3da4c..104ed21a689 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
@@ -112,6 +112,13 @@ public void close() throws Exception {
outboundObserver.onNext(elements.build());
}
+ @Override
+ public void flush() throws IOException {
+ if (bufferedElements.size() > 0) {
+ outboundObserver.onNext(convertBufferForTransmission().build());
+ }
+ }
+
@Override
public void accept(WindowedValue<T> t) throws IOException {
if (closed) {
@@ -120,7 +127,7 @@ public void accept(WindowedValue<T> t) throws IOException {
coder.encode(t, bufferedElements);
counter += 1;
if (bufferedElements.size() >= bufferLimit) {
- outboundObserver.onNext(convertBufferForTransmission().build());
+ flush();
}
}
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
index d497c8fed25..c8b1256fd9d 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CloseableFnDataReceiver.java
@@ -24,6 +24,17 @@
* <p>The close method for a {@link CloseableFnDataReceiver} must be
idempotent.
*/
public interface CloseableFnDataReceiver<T> extends FnDataReceiver<T>,
AutoCloseable {
+
+ /**
+ * Eagerly flushes any data that is buffered in this channel.
+ *
+ * TODO: Remove once splitting/checkpointing are available in SDKs and
rewinding in readers.
+ *
+ * @throws Exception
+ */
+ @Deprecated
+ void flush() throws Exception;
+
/**
* {@inheritDoc}.
*
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 157531)
Time Spent: 1h 40m (was: 1.5h)
> Bound the amount of data on the data plane by time.
> ---------------------------------------------------
>
> Key: BEAM-5791
> URL: https://issues.apache.org/jira/browse/BEAM-5791
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow, sdk-java-harness, sdk-py-harness
> Reporter: Robert Bradshaw
> Assignee: Henning Rohde
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> This is especially important for Fn API reads, where each element represents
> a shard to read and may be very expensive, but many elements may be waiting
> in the Fn API buffer.
> The need for this will be mitigated with full SDF support for liquid sharding
> over the Fn API, but not eliminated unless the runner can "unread" elements
> it has already sent.
> This is especially important in for dataflow jobs that start out small but
> then detect that they need more workers (e.g. due to the initial inputs being
> an SDF).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)