This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 41369a71df9 Report Outstanding Bundles and Bytes (#29041)
41369a71df9 is described below

commit 41369a71df983bcd0584792889e202894e38b362
Author: edman124 <[email protected]>
AuthorDate: Mon Oct 30 17:01:13 2023 -0700

    Report Outstanding Bundles and Bytes (#29041)
    
    * New active threads metric and initial tests
    
    * create unit tests for max active threads metric
    
    * remove test filter
    
    * fix formatting with spotless apply
    
    * revert format changes
    
    * revert format change straggler
    
    * revert format change straggler
    
    * remove unnecessary comment
    
    * synchronize threads in unit tests
    
    * fix formatting with spotless apply
    
    * remove comments and rename counter
    
    * fix formatting with spotless apply
    
    * fix tests for StreamingDataflowWorker change and fixed createMockWork
    
    * Add outstanding bytes and bundle metrics + outstanding bytes test + 
refactor metric unit tests
    
    * report outstand bundles and byte metrics
    
    * Add variable for max bytes outstanding
    
    * remove duplicate test
---
 .../dataflow/worker/DataflowSystemMetrics.java     |   4 +
 .../dataflow/worker/StreamingDataflowWorker.java   |  24 +++
 .../dataflow/worker/util/BoundedQueueExecutor.java |  16 ++
 .../worker/StreamingDataflowWorkerTest.java        | 186 ++++++++++++++++++---
 4 files changed, 207 insertions(+), 23 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
index ee2a04af998..640febc616b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java
@@ -42,6 +42,10 @@ public class DataflowSystemMetrics {
     TIME_AT_MAX_ACTIVE_THREADS("dataflow_time_at_max_active_threads"),
     ACTIVE_THREADS("dataflow_active_threads"),
     TOTAL_ALLOCATED_THREADS("dataflow_total_allocated_threads"),
+    OUTSTANDING_BYTES("dataflow_outstanding_bytes"),
+    MAX_OUTSTANDING_BYTES("dataflow_max_outstanding_bytes"),
+    OUTSTANDING_BUNDLES("dataflow_outstanding_bundles"),
+    MAX_OUTSTANDING_BUNDLES("dataflow_max_outstanding_bundles"),
     WINDMILL_QUOTA_THROTTLING("dataflow_streaming_engine_throttled_msecs"),
     MEMORY_THRASHING("dataflow_streaming_engine_user_worker_thrashing");
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 77f5205cf7e..811250ee785 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -251,6 +251,10 @@ public class StreamingDataflowWorker {
   private final Counter<Long, Long> timeAtMaxActiveThreads;
   private final Counter<Integer, Integer> activeThreads;
   private final Counter<Integer, Integer> totalAllocatedThreads;
+  private final Counter<Long, Long> outstandingBytes;
+  private final Counter<Long, Long> maxOutstandingBytes;
+  private final Counter<Long, Long> outstandingBundles;
+  private final Counter<Long, Long> maxOutstandingBundles;
   private final Counter<Integer, Integer> 
windmillMaxObservedWorkItemCommitBytes;
   private final Counter<Integer, Integer> memoryThrashing;
   private final boolean publishCounters;
@@ -337,6 +341,18 @@ public class StreamingDataflowWorker {
             
StreamingSystemCounterNames.TIME_AT_MAX_ACTIVE_THREADS.counterName());
     this.activeThreads =
         
pendingCumulativeCounters.intSum(StreamingSystemCounterNames.ACTIVE_THREADS.counterName());
+    this.outstandingBytes =
+        pendingCumulativeCounters.longSum(
+            StreamingSystemCounterNames.OUTSTANDING_BYTES.counterName());
+    this.maxOutstandingBytes =
+        pendingCumulativeCounters.longSum(
+            StreamingSystemCounterNames.MAX_OUTSTANDING_BYTES.counterName());
+    this.outstandingBundles =
+        pendingCumulativeCounters.longSum(
+            StreamingSystemCounterNames.OUTSTANDING_BUNDLES.counterName());
+    this.maxOutstandingBundles =
+        pendingCumulativeCounters.longSum(
+            StreamingSystemCounterNames.MAX_OUTSTANDING_BUNDLES.counterName());
     this.totalAllocatedThreads =
         pendingCumulativeCounters.intSum(
             StreamingSystemCounterNames.TOTAL_ALLOCATED_THREADS.counterName());
@@ -1721,6 +1737,14 @@ public class StreamingDataflowWorker {
     activeThreads.addValue(workUnitExecutor.activeCount());
     totalAllocatedThreads.getAndReset();
     totalAllocatedThreads.addValue(chooseMaximumNumberOfThreads());
+    outstandingBytes.getAndReset();
+    outstandingBytes.addValue(workUnitExecutor.bytesOutstanding());
+    maxOutstandingBytes.getAndReset();
+    maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
+    outstandingBundles.getAndReset();
+    outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
+    maxOutstandingBundles.getAndReset();
+    
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
   }
 
   @VisibleForTesting
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index a160b0e6ad0..dcff1f73f10 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -123,6 +123,22 @@ public class BoundedQueueExecutor {
     return activeCount.intValue();
   }
 
+  public long bytesOutstanding() {
+    return bytesOutstanding;
+  }
+
+  public long elementsOutstanding() {
+    return elementsOutstanding;
+  }
+
+  public long maximumBytesOutstanding() {
+    return maximumBytesOutstanding;
+  }
+
+  public long maximumElementsOutstanding() {
+    return maximumElementsOutstanding;
+  }
+
   public String summaryHtml() {
     monitor.enter();
     try {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index fdec36d688e..6826607513d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -80,6 +80,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -226,6 +227,7 @@ public class StreamingDataflowWorkerTest {
   private static final ByteString DEFAULT_KEY_BYTES = 
ByteString.copyFromUtf8(DEFAULT_KEY_STRING);
   private static final String DEFAULT_DATA_STRING = "data";
   private static final String DEFAULT_DESTINATION_STREAM_ID = "out";
+  private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
   private static final Function<GetDataRequest, GetDataResponse> 
EMPTY_DATA_RESPONDER =
       (GetDataRequest request) -> {
         GetDataResponse.Builder builder = GetDataResponse.newBuilder();
@@ -2747,7 +2749,7 @@ public class StreamingDataflowWorkerTest {
             threadExpiration,
             TimeUnit.SECONDS,
             maxThreads,
-            10000000,
+            MAXIMUM_BYTES_OUTSTANDING,
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
@@ -2791,12 +2793,14 @@ public class StreamingDataflowWorkerTest {
     executor.shutdown();
   }
 
-  volatile boolean stop = false;
-
   @Test
   public void testActiveThreadMetric() throws Exception {
     int maxThreads = 5;
     int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
     // setting up actual implementation of executor instead of mocking to keep 
track of
     // active thread count.
     BoundedQueueExecutor executor =
@@ -2805,7 +2809,7 @@ public class StreamingDataflowWorkerTest {
             threadExpirationSec,
             TimeUnit.SECONDS,
             maxThreads,
-            10000000,
+            MAXIMUM_BYTES_OUTSTANDING,
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
@@ -2823,11 +2827,11 @@ public class StreamingDataflowWorkerTest {
 
     Consumer<Work> sleepProcessWorkFn =
         unused -> {
-          synchronized (this) {
-            this.notify();
-          }
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
           int count = 0;
-          while (!stop) {
+          while (!stop.get()) {
             count += 1;
           }
         };
@@ -2840,27 +2844,163 @@ public class StreamingDataflowWorkerTest {
     assertEquals(0, executor.activeCount());
 
     assertTrue(computationState.activateWork(key1Shard1, m2));
-    synchronized (this) {
-      executor.execute(m2, m2.getWorkItem().getSerializedSize());
-      this.wait();
-      // Seems current executor executes the initial work item twice
-      this.wait();
-    }
+    // activate work starts executing work if no other work is queued for that 
shard
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
     assertEquals(2, executor.activeCount());
 
     assertTrue(computationState.activateWork(key1Shard1, m3));
     assertTrue(computationState.activateWork(key1Shard1, m4));
-    synchronized (this) {
-      executor.execute(m3, m3.getWorkItem().getSerializedSize());
-      this.wait();
-    }
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+
     assertEquals(3, executor.activeCount());
-    synchronized (this) {
-      executor.execute(m4, m4.getWorkItem().getSerializedSize());
-      this.wait();
-    }
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
     assertEquals(4, executor.activeCount());
-    stop = true;
+    stop.set(true);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOutstandingBytesMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.bytesOutstanding());
+
+    long bytes = m2.getWorkItem().getSerializedSize();
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    // activate work starts executing work if no other work is queued for that 
shard
+    bytes += m2.getWorkItem().getSerializedSize();
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
+    assertEquals(bytes, executor.bytesOutstanding());
+
+    assertTrue(computationState.activateWork(key1Shard1, m3));
+    assertTrue(computationState.activateWork(key1Shard1, m4));
+
+    bytes += m3.getWorkItem().getSerializedSize();
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+    assertEquals(bytes, executor.bytesOutstanding());
+
+    bytes += m4.getWorkItem().getSerializedSize();
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
+    assertEquals(bytes, executor.bytesOutstanding());
+    stop.set(true);
+    executor.shutdown();
+  }
+
+  @Test
+  public void testOutstandingBundlesMetric() throws Exception {
+    int maxThreads = 5;
+    int threadExpirationSec = 60;
+    CountDownLatch processStart1 = new CountDownLatch(2);
+    CountDownLatch processStart2 = new CountDownLatch(3);
+    CountDownLatch processStart3 = new CountDownLatch(4);
+    AtomicBoolean stop = new AtomicBoolean(false);
+    // setting up actual implementation of executor instead of mocking to keep 
track of
+    // active thread count.
+    BoundedQueueExecutor executor =
+        new BoundedQueueExecutor(
+            maxThreads,
+            threadExpirationSec,
+            TimeUnit.SECONDS,
+            maxThreads,
+            MAXIMUM_BYTES_OUTSTANDING,
+            new ThreadFactoryBuilder()
+                .setNameFormat("DataflowWorkUnits-%d")
+                .setDaemon(true)
+                .build());
+
+    ComputationState computationState =
+        new ComputationState(
+            "computation",
+            
defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            executor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
+    Consumer<Work> sleepProcessWorkFn =
+        unused -> {
+          processStart1.countDown();
+          processStart2.countDown();
+          processStart3.countDown();
+          int count = 0;
+          while (!stop.get()) {
+            count += 1;
+          }
+        };
+
+    Work m2 = createMockWork(2, sleepProcessWorkFn);
+
+    Work m3 = createMockWork(3, sleepProcessWorkFn);
+
+    Work m4 = createMockWork(4, sleepProcessWorkFn);
+    assertEquals(0, executor.elementsOutstanding());
+
+    assertTrue(computationState.activateWork(key1Shard1, m2));
+    // activate work starts executing work if no other work is queued for that 
shard
+    executor.execute(m2, m2.getWorkItem().getSerializedSize());
+    processStart1.await();
+    assertEquals(2, executor.elementsOutstanding());
+
+    assertTrue(computationState.activateWork(key1Shard1, m3));
+    assertTrue(computationState.activateWork(key1Shard1, m4));
+
+    executor.execute(m3, m3.getWorkItem().getSerializedSize());
+    processStart2.await();
+    assertEquals(3, executor.elementsOutstanding());
+
+    executor.execute(m4, m4.getWorkItem().getSerializedSize());
+    processStart3.await();
+    assertEquals(4, executor.elementsOutstanding());
+    stop.set(true);
     executor.shutdown();
   }
 

Reply via email to