This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 42cad40532a [Dataflow Streaming] Remove call to Thread.setName and
track thread name inside Work. (#32715)
42cad40532a is described below
commit 42cad40532afea05d1001c4dc7f00714f2af4e0d
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Oct 9 03:44:15 2024 -0700
[Dataflow Streaming] Remove call to Thread.setName and track thread name
inside Work. (#32715)
Thread.setName is expensive and uses upto 4% cpu on jobs with many keys.
---
.../dataflow/worker/streaming/ActiveWorkState.java | 4 ++-
.../runners/dataflow/worker/streaming/Work.java | 9 ++++++
.../dataflow/worker/util/BoundedQueueExecutor.java | 15 ---------
.../work/processing/StreamingWorkScheduler.java | 2 ++
.../worker/util/BoundedQueueExecutorTest.java | 36 ----------------------
5 files changed, 14 insertions(+), 52 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index c80c3a882e5..4607096dd66 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -338,7 +338,7 @@ public final class ActiveWorkState {
"<table border=\"1\" "
+
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
writer.println(
- "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active
For</th><th>State</th><th>State Active For</th></tr>");
+ "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active
For</th><th>State</th><th>State Active For</th><th>Processing
Thread</th></tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
@@ -364,6 +364,8 @@ public final class ActiveWorkState {
activeWorkStatus.append(activeWork.getState());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(),
now));
+ activeWorkStatus.append("</td><td>");
+ activeWorkStatus.append(activeWork.getProcessingThreadName());
activeWorkStatus.append("</td></tr>\n");
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index e77823602ed..03d1e1ae469 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -72,6 +72,7 @@ public final class Work implements RefreshableWork {
private final String latencyTrackingId;
private TimedState currentState;
private volatile boolean isFailed;
+ private volatile String processingThreadName = "";
private Work(
WorkItem workItem,
@@ -188,6 +189,14 @@ public final class Work implements RefreshableWork {
this.currentState = TimedState.create(state, now);
}
+ public String getProcessingThreadName() {
+ return processingThreadName;
+ }
+
+ public void setProcessingThreadName(String processingThreadName) {
+ this.processingThreadName = processingThreadName;
+ }
+
@Override
public void setFailed() {
this.isFailed = true;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 5e3f293f7d5..9286be84cea 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -22,8 +22,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
@@ -223,18 +221,10 @@ public class BoundedQueueExecutor {
try {
executor.execute(
() -> {
- String threadName = Thread.currentThread().getName();
try {
- if (work instanceof ExecutableWork) {
- String workToken =
- debugFormattedWorkToken(
- ((ExecutableWork)
work).work().getWorkItem().getWorkToken());
- Thread.currentThread().setName(threadName + ":" + workToken);
- }
work.run();
} finally {
decrementCounters(workBytes);
- Thread.currentThread().setName(threadName);
}
});
} catch (RuntimeException e) {
@@ -244,11 +234,6 @@ public class BoundedQueueExecutor {
}
}
- @VisibleForTesting
- public static String debugFormattedWorkToken(long workToken) {
- return String.format("%016x", workToken);
- }
-
private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 965a29126dc..9a3e6eb6b09 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -225,6 +225,7 @@ public final class StreamingWorkScheduler {
Windmill.WorkItem workItem = work.getWorkItem();
String computationId = computationState.getComputationId();
ByteString key = workItem.getKey();
+ work.setProcessingThreadName(Thread.currentThread().getName());
work.setState(Work.State.PROCESSING);
setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
LOG.debug("Starting processing for {}:\n{}", computationId, work);
@@ -288,6 +289,7 @@ public final class StreamingWorkScheduler {
}
resetWorkLoggingContext(work.getLatencyTrackingId());
+ work.setProcessingThreadName("");
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index ad77958837a..73492528992 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -293,40 +293,4 @@ public class BoundedQueueExecutorTest {
+ "Work Queue Bytes: 0/10000000<br>/n";
assertEquals(expectedSummaryHtml, executor.summaryHtml());
}
-
- @Test
- public void testExecute_updatesThreadNameForExecutableWork() throws
InterruptedException {
- CountDownLatch waitForWorkExecution = new CountDownLatch(1);
- ExecutableWork executableWork =
- createWork(
- work -> {
- assertTrue(
- Thread.currentThread()
- .getName()
- .contains(
- BoundedQueueExecutor.debugFormattedWorkToken(
- work.getWorkItem().getWorkToken())));
- waitForWorkExecution.countDown();
- });
- executor.execute(executableWork,
executableWork.getWorkItem().getSerializedSize());
- waitForWorkExecution.await();
- }
-
- @Test
- public void testForceExecute_updatesThreadNameForExecutableWork() throws
InterruptedException {
- CountDownLatch waitForWorkExecution = new CountDownLatch(1);
- ExecutableWork executableWork =
- createWork(
- work -> {
- assertTrue(
- Thread.currentThread()
- .getName()
- .contains(
- BoundedQueueExecutor.debugFormattedWorkToken(
- work.getWorkItem().getWorkToken())));
- waitForWorkExecution.countDown();
- });
- executor.forceExecute(executableWork,
executableWork.getWorkItem().getSerializedSize());
- waitForWorkExecution.await();
- }
}