This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0d037a993b5 Fix incorrect Work.java cast and logging (#31528)
0d037a993b5 is described below
commit 0d037a993b51406e6a6f630791a8d17bfbb608a7
Author: martin trieu <[email protected]>
AuthorDate: Tue Jun 11 00:35:47 2024 -0700
Fix incorrect Work.java cast and logging (#31528)
---
.../runners/dataflow/worker/streaming/Work.java | 5 +-
.../dataflow/worker/util/BoundedQueueExecutor.java | 13 +++-
.../worker/util/BoundedQueueExecutorTest.java | 76 ++++++++++++++++++++--
3 files changed, 81 insertions(+), 13 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index fa46bac36b5..ed3f2671b40 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -58,7 +58,7 @@ import org.joda.time.Instant;
*/
@NotThreadSafe
@Internal
-public class Work {
+public final class Work {
private final ShardedKey shardedKey;
private final WorkItem workItem;
private final ProcessingContext processingContext;
@@ -196,8 +196,7 @@ public class Work {
return latencyTrackingId;
}
- public final void queueCommit(
- WorkItemCommitRequest commitRequest, ComputationState computationState) {
+ public void queueCommit(WorkItemCommitRequest commitRequest,
ComputationState computationState) {
setState(State.COMMIT_QUEUED);
processingContext.workCommitter().accept(Commit.create(commitRequest,
computationState, this));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9a481169350..5e3f293f7d5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -22,7 +22,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
@@ -224,9 +225,10 @@ public class BoundedQueueExecutor {
() -> {
String threadName = Thread.currentThread().getName();
try {
- if (work instanceof Work) {
+ if (work instanceof ExecutableWork) {
String workToken =
- String.format("%016x", ((Work)
work).getWorkItem().getWorkToken());
+ debugFormattedWorkToken(
+ ((ExecutableWork)
work).work().getWorkItem().getWorkToken());
Thread.currentThread().setName(threadName + ":" + workToken);
}
work.run();
@@ -242,6 +244,11 @@ public class BoundedQueueExecutor {
}
}
+ @VisibleForTesting
+ public static String debugFormattedWorkToken(long workToken) {
+ return String.format("%016x", workToken);
+ }
+
private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index c0620952ef9..e08c951975f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -23,9 +23,17 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -39,13 +47,31 @@ import org.junit.runners.JUnit4;
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {
- @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
-
+ @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
private BoundedQueueExecutor executor;
+ private static ExecutableWork createWork(Consumer<Work> executeWorkFn) {
+ return ExecutableWork.create(
+ Work.create(
+ Windmill.WorkItem.newBuilder()
+ .setKey(ByteString.EMPTY)
+ .setShardingKey(1)
+ .setWorkToken(33)
+ .setCacheToken(1)
+ .build(),
+ Watermarks.builder().setInputDataWatermark(Instant.now()).build(),
+ Work.createProcessingContext(
+ "computationId",
+ (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(),
+ ignored -> {}),
+ Instant::now,
+ Collections.emptyList()),
+ executeWorkFn);
+ }
+
private Runnable createSleepProcessWorkFn(CountDownLatch start,
CountDownLatch stop) {
Runnable runnable =
() -> {
@@ -203,14 +229,14 @@ public class BoundedQueueExecutorTest {
executor.execute(m3, 1);
assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
- assertEquals(0l, executor.allThreadsActiveTime());
+ assertEquals(0L, executor.allThreadsActiveTime());
stop.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}
// Max pool size was reached so the allThreadsActiveTime() was updated.
- assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+ assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
executor.shutdown();
}
@@ -241,7 +267,7 @@ public class BoundedQueueExecutorTest {
executor.execute(m3, 1);
assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
- assertEquals(0l, executor.allThreadsActiveTime());
+ assertEquals(0L, executor.allThreadsActiveTime());
// Increase the max thread count
executor.setMaximumPoolSize(5, 105);
stop.countDown();
@@ -251,13 +277,13 @@ public class BoundedQueueExecutorTest {
}
// Max pool size was updated during execution but allThreadsActiveTime()
was still recorded
// for the thread which reached the old max pool size.
- assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+ assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
executor.shutdown();
}
@Test
- public void testRenderSummaryHtml() throws Exception {
+ public void testRenderSummaryHtml() {
String expectedSummaryHtml =
"Worker Threads: 0/2<br>/n"
+ "Active Threads: 0<br>/n"
@@ -265,4 +291,40 @@ public class BoundedQueueExecutorTest {
+ "Work Queue Bytes: 0/10000000<br>/n";
assertEquals(expectedSummaryHtml, executor.summaryHtml());
}
+
+ @Test
+ public void testExecute_updatesThreadNameForExecutableWork() throws
InterruptedException {
+ CountDownLatch waitForWorkExecution = new CountDownLatch(1);
+ ExecutableWork executableWork =
+ createWork(
+ work -> {
+ assertTrue(
+ Thread.currentThread()
+ .getName()
+ .contains(
+ BoundedQueueExecutor.debugFormattedWorkToken(
+ work.getWorkItem().getWorkToken())));
+ waitForWorkExecution.countDown();
+ });
+ executor.execute(executableWork,
executableWork.getWorkItem().getSerializedSize());
+ waitForWorkExecution.await();
+ }
+
+ @Test
+ public void testForceExecute_updatesThreadNameForExecutableWork() throws
InterruptedException {
+ CountDownLatch waitForWorkExecution = new CountDownLatch(1);
+ ExecutableWork executableWork =
+ createWork(
+ work -> {
+ assertTrue(
+ Thread.currentThread()
+ .getName()
+ .contains(
+ BoundedQueueExecutor.debugFormattedWorkToken(
+ work.getWorkItem().getWorkToken())));
+ waitForWorkExecution.countDown();
+ });
+ executor.forceExecute(executableWork,
executableWork.getWorkItem().getSerializedSize());
+ waitForWorkExecution.await();
+ }
}