This is an automated email from the ASF dual-hosted git repository.
anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 41369a71df9 Report Outstanding Bundles and Bytes (#29041)
41369a71df9 is described below
commit 41369a71df983bcd0584792889e202894e38b362
Author: edman124 <[email protected]>
AuthorDate: Mon Oct 30 17:01:13 2023 -0700
Report Outstanding Bundles and Bytes (#29041)
* New active threads metric and initial tests
* create unit tests for max active threads metric
* remove test filter
* fix formatting with spotless apply
* revert format changes
* revert format change straggler
* revert format change straggler
* remove unnecessary comment
* synchronize threads in unit tests
* fix formatting with spotless apply
* remove comments and rename counter
* fix formatting with spotless apply
* fix tests for StreamingDataflowWorker change and fixed createMockWork
* Add outstanding bytes and bundle metrics + outstanding bytes test +
refactor metric unit tests
* report outstand bundles and byte metrics
* Add variable for max bytes outstanding
* remove duplicate test
---
.../dataflow/worker/DataflowSystemMetrics.java | 4 +
.../dataflow/worker/StreamingDataflowWorker.java | 24 +++
.../dataflow/worker/util/BoundedQueueExecutor.java | 16 ++
.../worker/StreamingDataflowWorkerTest.java | 186 ++++++++++++++++++---
4 files changed, 207 insertions(+), 23 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
index ee2a04af998..640febc616b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
@@ -42,6 +42,10 @@ public class DataflowSystemMetrics {
TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"),
ACTIVE_THREADS("dataflow_active_threads"),
TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"),
+ OUTSTANDING_BYTES("dataflow_outstanding_bytes"),
+ MAX_OUTSTANDING_BYTES("dataflow_max_outstanding_bytes"),
+ OUTSTANDING_BUNDLES("dataflow_outstanding_bundles"),
+ MAX_OUTSTANDING_BUNDLES("dataflow_max_outstanding_bundles"),
WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"),
MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing");
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 77f5205cf7e..811250ee785 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -251,6 +251,10 @@ public class StreamingDataflowWorker {
private final Counter<Long, Long> timeAtMaxActiveThreads;
private final Counter<Integer, Integer> activeThreads;
private final Counter<Integer, Integer> totalAllocatedThreads;
+ private final Counter<Long, Long> outstandingBytes;
+ private final Counter<Long, Long> maxOutstandingBytes;
+ private final Counter<Long, Long> outstandingBundles;
+ private final Counter<Long, Long> maxOutstandingBundles;
private final Counter<Integer, Integer>
windmillMaxObservedWorkItemCommitBytes;
private final Counter<Integer, Integer> memoryThrashing;
private final boolean publishCounters;
@@ -337,6 +341,18 @@ public class StreamingDataflowWorker {
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
this.activeThreads =
pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName());
+ this.outstandingBytes =
+ pendingCumulativeCounters.longSum(
+ StreamingSystemCounterNames.OUTSTANDING_BYTES.counterName());
+ this.maxOutstandingBytes =
+ pendingCumulativeCounters.longSum(
+ StreamingSystemCounterNames.MAX_OUTSTANDING_BYTES.counterName());
+ this.outstandingBundles =
+ pendingCumulativeCounters.longSum(
+ StreamingSystemCounterNames.OUTSTANDING_BUNDLES.counterName());
+ this.maxOutstandingBundles =
+ pendingCumulativeCounters.longSum(
+ StreamingSystemCounterNames.MAX_OUTSTANDING_BUNDLES.counterName());
this.totalAllocatedThreads =
pendingCumulativeCounters.intSum(
StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName());
@@ -1721,6 +1737,14 @@ public class StreamingDataflowWorker {
activeThreads.addValue(workUnitExecutor.activeCount());
totalAllocatedThreads.getAndReset();
totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads());
+ outstandingBytes.getAndReset();
+ outstandingBytes.addValue(workUnitExecutor.bytesOutstanding());
+ maxOutstandingBytes.getAndReset();
+ maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
+ outstandingBundles.getAndReset();
+ outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
+ maxOutstandingBundles.getAndReset();
+
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
}
@VisibleForTesting
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index a160b0e6ad0..dcff1f73f10 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -123,6 +123,22 @@ public class BoundedQueueExecutor {
return activeCount.intValue();
}
+ public long bytesOutstanding() {
+ return bytesOutstanding;
+ }
+
+ public long elementsOutstanding() {
+ return elementsOutstanding;
+ }
+
+ public long maximumBytesOutstanding() {
+ return maximumBytesOutstanding;
+ }
+
+ public long maximumElementsOutstanding() {
+ return maximumElementsOutstanding;
+ }
+
public String summaryHtml() {
monitor.enter();
try {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index fdec36d688e..6826607513d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -80,6 +80,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -226,6 +227,7 @@ public class StreamingDataflowWorkerTest {
private static final ByteString DEFAULT_KEY_BYTES =
ByteString.copyFromUtf8(DEFAULT_KEY_STRING);
private static final String DEFAULT_DATA_STRING = "data";
private static final String DEFAULT_DESTINATION_STREAM_ID = "out";
+ private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final Function<GetDataRequest, GetDataResponse>
EMPTY_DATA_RESPONDER =
(GetDataRequest request) -> {
GetDataResponse.Builder builder = GetDataResponse.newBuilder();
@@ -2747,7 +2749,7 @@ public class StreamingDataflowWorkerTest {
threadExpiration,
TimeUnit.SECONDS,
maxThreads,
- 10000000,
+ MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
@@ -2791,12 +2793,14 @@ public class StreamingDataflowWorkerTest {
executor.shutdown();
}
- volatile boolean stop = false;
-
@Test
public void testActiveThreadMetric() throws Exception {
int maxThreads = 5;
int threadExpirationSec = 60;
+ CountDownLatch processStart1 = new CountDownLatch(2);
+ CountDownLatch processStart2 = new CountDownLatch(3);
+ CountDownLatch processStart3 = new CountDownLatch(4);
+ AtomicBoolean stop = new AtomicBoolean(false);
// setting up actual implementation of executor instead of mocking to keep
track of
// active thread count.
BoundedQueueExecutor executor =
@@ -2805,7 +2809,7 @@ public class StreamingDataflowWorkerTest {
threadExpirationSec,
TimeUnit.SECONDS,
maxThreads,
- 10000000,
+ MAXIMUM_BYTES_OUTSTANDING,
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
@@ -2823,11 +2827,11 @@ public class StreamingDataflowWorkerTest {
Consumer<Work> sleepProcessWorkFn =
unused -> {
- synchronized (this) {
- this.notify();
- }
+ processStart1.countDown();
+ processStart2.countDown();
+ processStart3.countDown();
int count = 0;
- while (!stop) {
+ while (!stop.get()) {
count += 1;
}
};
@@ -2840,27 +2844,163 @@ public class StreamingDataflowWorkerTest {
assertEquals(0, executor.activeCount());
assertTrue(computationState.activateWork(key1Shard1, m2));
- synchronized (this) {
- executor.execute(m2, m2.getWorkItem().getSerializedSize());
- this.wait();
- // Seems current executor executes the initial work item twice
- this.wait();
- }
+ // activate work starts executing work if no other work is queued for that
shard
+ executor.execute(m2, m2.getWorkItem().getSerializedSize());
+ processStart1.await();
assertEquals(2, executor.activeCount());
assertTrue(computationState.activateWork(key1Shard1, m3));
assertTrue(computationState.activateWork(key1Shard1, m4));
- synchronized (this) {
- executor.execute(m3, m3.getWorkItem().getSerializedSize());
- this.wait();
- }
+ executor.execute(m3, m3.getWorkItem().getSerializedSize());
+ processStart2.await();
+
assertEquals(3, executor.activeCount());
- synchronized (this) {
- executor.execute(m4, m4.getWorkItem().getSerializedSize());
- this.wait();
- }
+ executor.execute(m4, m4.getWorkItem().getSerializedSize());
+ processStart3.await();
assertEquals(4, executor.activeCount());
- stop = true;
+ stop.set(true);
+ executor.shutdown();
+ }
+
+ @Test
+ public void testOutstandingBytesMetric() throws Exception {
+ int maxThreads = 5;
+ int threadExpirationSec = 60;
+ CountDownLatch processStart1 = new CountDownLatch(2);
+ CountDownLatch processStart2 = new CountDownLatch(3);
+ CountDownLatch processStart3 = new CountDownLatch(4);
+ AtomicBoolean stop = new AtomicBoolean(false);
+ // setting up actual implementation of executor instead of mocking to keep
track of
+ // active thread count.
+ BoundedQueueExecutor executor =
+ new BoundedQueueExecutor(
+ maxThreads,
+ threadExpirationSec,
+ TimeUnit.SECONDS,
+ maxThreads,
+ MAXIMUM_BYTES_OUTSTANDING,
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataflowWorkUnits-%d")
+ .setDaemon(true)
+ .build());
+
+ ComputationState computationState =
+ new ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ executor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"),
1);
+ Consumer<Work> sleepProcessWorkFn =
+ unused -> {
+ processStart1.countDown();
+ processStart2.countDown();
+ processStart3.countDown();
+ int count = 0;
+ while (!stop.get()) {
+ count += 1;
+ }
+ };
+
+ Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+ Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+ Work m4 = createMockWork(4, sleepProcessWorkFn);
+ assertEquals(0, executor.bytesOutstanding());
+
+ long bytes = m2.getWorkItem().getSerializedSize();
+ assertTrue(computationState.activateWork(key1Shard1, m2));
+ // activate work starts executing work if no other work is queued for that
shard
+ bytes += m2.getWorkItem().getSerializedSize();
+ executor.execute(m2, m2.getWorkItem().getSerializedSize());
+ processStart1.await();
+ assertEquals(bytes, executor.bytesOutstanding());
+
+ assertTrue(computationState.activateWork(key1Shard1, m3));
+ assertTrue(computationState.activateWork(key1Shard1, m4));
+
+ bytes += m3.getWorkItem().getSerializedSize();
+ executor.execute(m3, m3.getWorkItem().getSerializedSize());
+ processStart2.await();
+ assertEquals(bytes, executor.bytesOutstanding());
+
+ bytes += m4.getWorkItem().getSerializedSize();
+ executor.execute(m4, m4.getWorkItem().getSerializedSize());
+ processStart3.await();
+ assertEquals(bytes, executor.bytesOutstanding());
+ stop.set(true);
+ executor.shutdown();
+ }
+
+ @Test
+ public void testOutstandingBundlesMetric() throws Exception {
+ int maxThreads = 5;
+ int threadExpirationSec = 60;
+ CountDownLatch processStart1 = new CountDownLatch(2);
+ CountDownLatch processStart2 = new CountDownLatch(3);
+ CountDownLatch processStart3 = new CountDownLatch(4);
+ AtomicBoolean stop = new AtomicBoolean(false);
+ // setting up actual implementation of executor instead of mocking to keep
track of
+ // active thread count.
+ BoundedQueueExecutor executor =
+ new BoundedQueueExecutor(
+ maxThreads,
+ threadExpirationSec,
+ TimeUnit.SECONDS,
+ maxThreads,
+ MAXIMUM_BYTES_OUTSTANDING,
+ new ThreadFactoryBuilder()
+ .setNameFormat("DataflowWorkUnits-%d")
+ .setDaemon(true)
+ .build());
+
+ ComputationState computationState =
+ new ComputationState(
+ "computation",
+
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+ executor,
+ ImmutableMap.of(),
+ null);
+
+ ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"),
1);
+ Consumer<Work> sleepProcessWorkFn =
+ unused -> {
+ processStart1.countDown();
+ processStart2.countDown();
+ processStart3.countDown();
+ int count = 0;
+ while (!stop.get()) {
+ count += 1;
+ }
+ };
+
+ Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+ Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+ Work m4 = createMockWork(4, sleepProcessWorkFn);
+ assertEquals(0, executor.elementsOutstanding());
+
+ assertTrue(computationState.activateWork(key1Shard1, m2));
+ // activate work starts executing work if no other work is queued for that
shard
+ executor.execute(m2, m2.getWorkItem().getSerializedSize());
+ processStart1.await();
+ assertEquals(2, executor.elementsOutstanding());
+
+ assertTrue(computationState.activateWork(key1Shard1, m3));
+ assertTrue(computationState.activateWork(key1Shard1, m4));
+
+ executor.execute(m3, m3.getWorkItem().getSerializedSize());
+ processStart2.await();
+ assertEquals(3, executor.elementsOutstanding());
+
+ executor.execute(m4, m4.getWorkItem().getSerializedSize());
+ processStart3.await();
+ assertEquals(4, executor.elementsOutstanding());
+ stop.set(true);
executor.shutdown();
}