This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 42cad40532a [Dataflow Streaming] Remove call to Thread.setName and 
track thread name inside Work. (#32715)
42cad40532a is described below

commit 42cad40532afea05d1001c4dc7f00714f2af4e0d
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Oct 9 03:44:15 2024 -0700

    [Dataflow Streaming] Remove call to Thread.setName and track thread name 
inside Work. (#32715)
    
    Thread.setName is expensive and uses upto 4% cpu on jobs with many keys.
---
 .../dataflow/worker/streaming/ActiveWorkState.java |  4 ++-
 .../runners/dataflow/worker/streaming/Work.java    |  9 ++++++
 .../dataflow/worker/util/BoundedQueueExecutor.java | 15 ---------
 .../work/processing/StreamingWorkScheduler.java    |  2 ++
 .../worker/util/BoundedQueueExecutorTest.java      | 36 ----------------------
 5 files changed, 14 insertions(+), 52 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index c80c3a882e5..4607096dd66 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -338,7 +338,7 @@ public final class ActiveWorkState {
         "<table border=\"1\" "
             + 
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
     writer.println(
-        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th></tr>");
+        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th><th>Processing 
Thread</th></tr>");
     // Use StringBuilder because we are appending in loop.
     StringBuilder activeWorkStatus = new StringBuilder();
     int commitsPendingCount = 0;
@@ -364,6 +364,8 @@ public final class ActiveWorkState {
       activeWorkStatus.append(activeWork.getState());
       activeWorkStatus.append("</td><td>");
       activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), 
now));
+      activeWorkStatus.append("</td><td>");
+      activeWorkStatus.append(activeWork.getProcessingThreadName());
       activeWorkStatus.append("</td></tr>\n");
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index e77823602ed..03d1e1ae469 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -72,6 +72,7 @@ public final class Work implements RefreshableWork {
   private final String latencyTrackingId;
   private TimedState currentState;
   private volatile boolean isFailed;
+  private volatile String processingThreadName = "";
 
   private Work(
       WorkItem workItem,
@@ -188,6 +189,14 @@ public final class Work implements RefreshableWork {
     this.currentState = TimedState.create(state, now);
   }
 
+  public String getProcessingThreadName() {
+    return processingThreadName;
+  }
+
+  public void setProcessingThreadName(String processingThreadName) {
+    this.processingThreadName = processingThreadName;
+  }
+
   @Override
   public void setFailed() {
     this.isFailed = true;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 5e3f293f7d5..9286be84cea 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -22,8 +22,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
 
@@ -223,18 +221,10 @@ public class BoundedQueueExecutor {
     try {
       executor.execute(
           () -> {
-            String threadName = Thread.currentThread().getName();
             try {
-              if (work instanceof ExecutableWork) {
-                String workToken =
-                    debugFormattedWorkToken(
-                        ((ExecutableWork) 
work).work().getWorkItem().getWorkToken());
-                Thread.currentThread().setName(threadName + ":" + workToken);
-              }
               work.run();
             } finally {
               decrementCounters(workBytes);
-              Thread.currentThread().setName(threadName);
             }
           });
     } catch (RuntimeException e) {
@@ -244,11 +234,6 @@ public class BoundedQueueExecutor {
     }
   }
 
-  @VisibleForTesting
-  public static String debugFormattedWorkToken(long workToken) {
-    return String.format("%016x", workToken);
-  }
-
   private void decrementCounters(long workBytes) {
     monitor.enter();
     --elementsOutstanding;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 965a29126dc..9a3e6eb6b09 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -225,6 +225,7 @@ public final class StreamingWorkScheduler {
     Windmill.WorkItem workItem = work.getWorkItem();
     String computationId = computationState.getComputationId();
     ByteString key = workItem.getKey();
+    work.setProcessingThreadName(Thread.currentThread().getName());
     work.setState(Work.State.PROCESSING);
     setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
     LOG.debug("Starting processing for {}:\n{}", computationId, work);
@@ -288,6 +289,7 @@ public final class StreamingWorkScheduler {
       }
 
       resetWorkLoggingContext(work.getLatencyTrackingId());
+      work.setProcessingThreadName("");
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index ad77958837a..73492528992 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -293,40 +293,4 @@ public class BoundedQueueExecutorTest {
             + "Work Queue Bytes: 0/10000000<br>/n";
     assertEquals(expectedSummaryHtml, executor.summaryHtml());
   }
-
-  @Test
-  public void testExecute_updatesThreadNameForExecutableWork() throws 
InterruptedException {
-    CountDownLatch waitForWorkExecution = new CountDownLatch(1);
-    ExecutableWork executableWork =
-        createWork(
-            work -> {
-              assertTrue(
-                  Thread.currentThread()
-                      .getName()
-                      .contains(
-                          BoundedQueueExecutor.debugFormattedWorkToken(
-                              work.getWorkItem().getWorkToken())));
-              waitForWorkExecution.countDown();
-            });
-    executor.execute(executableWork, 
executableWork.getWorkItem().getSerializedSize());
-    waitForWorkExecution.await();
-  }
-
-  @Test
-  public void testForceExecute_updatesThreadNameForExecutableWork() throws 
InterruptedException {
-    CountDownLatch waitForWorkExecution = new CountDownLatch(1);
-    ExecutableWork executableWork =
-        createWork(
-            work -> {
-              assertTrue(
-                  Thread.currentThread()
-                      .getName()
-                      .contains(
-                          BoundedQueueExecutor.debugFormattedWorkToken(
-                              work.getWorkItem().getWorkToken())));
-              waitForWorkExecution.countDown();
-            });
-    executor.forceExecute(executableWork, 
executableWork.getWorkItem().getSerializedSize());
-    waitForWorkExecution.await();
-  }
 }

Reply via email to