This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d037a993b5 Fix incorrect Work.java cast and logging (#31528)
0d037a993b5 is described below

commit 0d037a993b51406e6a6f630791a8d17bfbb608a7
Author: martin trieu <[email protected]>
AuthorDate: Tue Jun 11 00:35:47 2024 -0700

    Fix incorrect Work.java cast and logging (#31528)
---
 .../runners/dataflow/worker/streaming/Work.java    |  5 +-
 .../dataflow/worker/util/BoundedQueueExecutor.java | 13 +++-
 .../worker/util/BoundedQueueExecutorTest.java      | 76 ++++++++++++++++++++--
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index fa46bac36b5..ed3f2671b40 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -58,7 +58,7 @@ import org.joda.time.Instant;
  */
 @NotThreadSafe
 @Internal
-public class Work {
+public final class Work {
   private final ShardedKey shardedKey;
   private final WorkItem workItem;
   private final ProcessingContext processingContext;
@@ -196,8 +196,7 @@ public class Work {
     return latencyTrackingId;
   }
 
-  public final void queueCommit(
-      WorkItemCommitRequest commitRequest, ComputationState computationState) {
+  public void queueCommit(WorkItemCommitRequest commitRequest, 
ComputationState computationState) {
     setState(State.COMMIT_QUEUED);
     processingContext.workCommitter().accept(Commit.create(commitRequest, 
computationState, this));
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9a481169350..5e3f293f7d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -22,7 +22,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
 
@@ -224,9 +225,10 @@ public class BoundedQueueExecutor {
           () -> {
             String threadName = Thread.currentThread().getName();
             try {
-              if (work instanceof Work) {
+              if (work instanceof ExecutableWork) {
                 String workToken =
-                    String.format("%016x", ((Work) 
work).getWorkItem().getWorkToken());
+                    debugFormattedWorkToken(
+                        ((ExecutableWork) 
work).work().getWorkItem().getWorkToken());
                 Thread.currentThread().setName(threadName + ":" + workToken);
               }
               work.run();
@@ -242,6 +244,11 @@ public class BoundedQueueExecutor {
     }
   }
 
+  @VisibleForTesting
+  public static String debugFormattedWorkToken(long workToken) {
+    return String.format("%016x", workToken);
+  }
+
   private void decrementCounters(long workBytes) {
     monitor.enter();
     --elementsOutstanding;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index c0620952ef9..e08c951975f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -23,9 +23,17 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -39,13 +47,31 @@ import org.junit.runners.JUnit4;
 // released (2.11.0)
 @SuppressWarnings("unused")
 public class BoundedQueueExecutorTest {
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
   private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
   private static final int DEFAULT_MAX_THREADS = 2;
   private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
-
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(300);
   private BoundedQueueExecutor executor;
 
+  private static ExecutableWork createWork(Consumer<Work> executeWorkFn) {
+    return ExecutableWork.create(
+        Work.create(
+            Windmill.WorkItem.newBuilder()
+                .setKey(ByteString.EMPTY)
+                .setShardingKey(1)
+                .setWorkToken(33)
+                .setCacheToken(1)
+                .build(),
+            Watermarks.builder().setInputDataWatermark(Instant.now()).build(),
+            Work.createProcessingContext(
+                "computationId",
+                (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(),
+                ignored -> {}),
+            Instant::now,
+            Collections.emptyList()),
+        executeWorkFn);
+  }
+
   private Runnable createSleepProcessWorkFn(CountDownLatch start, 
CountDownLatch stop) {
     Runnable runnable =
         () -> {
@@ -203,14 +229,14 @@ public class BoundedQueueExecutorTest {
     executor.execute(m3, 1);
     assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
 
-    assertEquals(0l, executor.allThreadsActiveTime());
+    assertEquals(0L, executor.allThreadsActiveTime());
     stop.countDown();
     while (executor.activeCount() != 0) {
       // Waiting for all threads to be ended.
       Thread.sleep(200);
     }
     // Max pool size was reached so the allThreadsActiveTime() was updated.
-    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
 
     executor.shutdown();
   }
@@ -241,7 +267,7 @@ public class BoundedQueueExecutorTest {
     executor.execute(m3, 1);
     assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));
 
-    assertEquals(0l, executor.allThreadsActiveTime());
+    assertEquals(0L, executor.allThreadsActiveTime());
     // Increase the max thread count
     executor.setMaximumPoolSize(5, 105);
     stop.countDown();
@@ -251,13 +277,13 @@ public class BoundedQueueExecutorTest {
     }
     // Max pool size was updated during execution but allThreadsActiveTime() 
was still recorded
     // for the thread which reached the old max pool size.
-    assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
+    assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
 
     executor.shutdown();
   }
 
   @Test
-  public void testRenderSummaryHtml() throws Exception {
+  public void testRenderSummaryHtml() {
     String expectedSummaryHtml =
         "Worker Threads: 0/2<br>/n"
             + "Active Threads: 0<br>/n"
@@ -265,4 +291,40 @@ public class BoundedQueueExecutorTest {
             + "Work Queue Bytes: 0/10000000<br>/n";
     assertEquals(expectedSummaryHtml, executor.summaryHtml());
   }
+
+  @Test
+  public void testExecute_updatesThreadNameForExecutableWork() throws 
InterruptedException {
+    CountDownLatch waitForWorkExecution = new CountDownLatch(1);
+    ExecutableWork executableWork =
+        createWork(
+            work -> {
+              assertTrue(
+                  Thread.currentThread()
+                      .getName()
+                      .contains(
+                          BoundedQueueExecutor.debugFormattedWorkToken(
+                              work.getWorkItem().getWorkToken())));
+              waitForWorkExecution.countDown();
+            });
+    executor.execute(executableWork, 
executableWork.getWorkItem().getSerializedSize());
+    waitForWorkExecution.await();
+  }
+
+  @Test
+  public void testForceExecute_updatesThreadNameForExecutableWork() throws 
InterruptedException {
+    CountDownLatch waitForWorkExecution = new CountDownLatch(1);
+    ExecutableWork executableWork =
+        createWork(
+            work -> {
+              assertTrue(
+                  Thread.currentThread()
+                      .getName()
+                      .contains(
+                          BoundedQueueExecutor.debugFormattedWorkToken(
+                              work.getWorkItem().getWorkToken())));
+              waitForWorkExecution.countDown();
+            });
+    executor.forceExecute(executableWork, 
executableWork.getWorkItem().getSerializedSize());
+    waitForWorkExecution.await();
+  }
 }

Reply via email to