This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 371576a3b17 Remove unused StreamingDataflowWorker parameter (#30256)
371576a3b17 is described below

commit 371576a3b17b940380192378848dd00c55d0cc19
Author: martin trieu <[email protected]>
AuthorDate: Mon Feb 12 06:33:20 2024 -0800

    Remove unused StreamingDataflowWorker parameter (#30256)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  2 +-
 .../worker/StreamingModeExecutionContext.java      | 23 ++++++-----------
 .../dataflow/worker/counters/NameContext.java      | 29 ++++++++++++++++++++--
 .../dataflow/worker/streaming/StageInfo.java       |  7 +++---
 .../worker/DataflowExecutionContextTest.java       |  9 +++----
 .../worker/StreamingModeExecutionContextTest.java  | 14 +++--------
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  4 +--
 7 files changed, 46 insertions(+), 42 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index bca14923cfc..2f9e18cde67 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -991,7 +991,7 @@ public class StreamingDataflowWorker {
 
     StageInfo stageInfo =
         stageInfoMap.computeIfAbsent(
-            mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName(), this));
+            mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
 
     ExecutionState executionState = null;
     String counterName = "dataflow_source_bytes_processed-" + 
mapTask.getSystemName();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 5b18e29293e..2e9e7e608a5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
@@ -440,21 +441,18 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
    * needs to be thread safe for multiple writers. A single stage could have 
multiple executors
    * running concurrently.
    */
-  public static class StreamingModeExecutionState
-      extends DataflowOperationContext.DataflowExecutionState {
+  public static class StreamingModeExecutionState extends 
DataflowExecutionState {
 
     // AtomicLong is used because this value is written in two places:
     // 1. The sampling thread calls takeSample to increment the time spent in 
this state
     // 2. The reporting thread calls extractUpdate which reads the current sum 
*AND* sets it to 0.
     private final AtomicLong totalMillisInState = new AtomicLong();
 
-    @SuppressWarnings("unused")
     public StreamingModeExecutionState(
         NameContext nameContext,
         String stateName,
         MetricsContainer metricsContainer,
-        ProfileScope profileScope,
-        StreamingDataflowWorker worker) {
+        ProfileScope profileScope) {
       // TODO: Take in the requesting step name and side input index for 
streaming.
       super(nameContext, stateName, null, null, metricsContainer, 
profileScope);
     }
@@ -492,22 +490,15 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
    */
   public static class StreamingModeExecutionStateRegistry extends 
DataflowExecutionStateRegistry {
 
-    private final StreamingDataflowWorker worker;
-
-    public StreamingModeExecutionStateRegistry(StreamingDataflowWorker worker) 
{
-      this.worker = worker;
-    }
-
     @Override
-    protected DataflowOperationContext.DataflowExecutionState createState(
+    protected DataflowExecutionState createState(
         NameContext nameContext,
         String stateName,
         String requestingStepName,
         Integer inputIndex,
         MetricsContainer container,
         ProfileScope profileScope) {
-      return new StreamingModeExecutionState(
-          nameContext, stateName, container, profileScope, worker);
+      return new StreamingModeExecutionState(nameContext, stateName, 
container, profileScope);
     }
   }
 
@@ -515,14 +506,14 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     private final ExecutionState readState;
     private final @Nullable ExecutionStateTracker stateTracker;
 
-    ScopedReadStateSupplier(
+    private ScopedReadStateSupplier(
         DataflowOperationContext operationContext, ExecutionStateTracker 
stateTracker) {
       this.readState = operationContext.newExecutionState("windmill-read");
       this.stateTracker = stateTracker;
     }
 
     @Override
-    public Closeable get() {
+    public @Nullable Closeable get() {
       if (stateTracker == null) {
         return null;
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
index 4f4a1c3834e..a5e231a7cf4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
@@ -35,7 +35,11 @@ public abstract class NameContext {
       @Nullable String originalName,
       String systemName,
       @Nullable String userName) {
-    return new AutoValue_NameContext(stageName, originalName, systemName, 
userName);
+    return NameContext.newBuilder(stageName)
+        .setOriginalName(originalName)
+        .setSystemName(systemName)
+        .setUserName(userName)
+        .build();
   }
 
   /**
@@ -43,7 +47,15 @@ public abstract class NameContext {
    * specific steps..
    */
   public static NameContext forStage(String stageName) {
-    return new AutoValue_NameContext(stageName, null, null, null);
+    return newBuilder(stageName).build();
+  }
+
+  public static Builder newBuilder(String stageName) {
+    return new AutoValue_NameContext.Builder()
+        .setStageName(stageName)
+        .setUserName(null)
+        .setSystemName(null)
+        .setOriginalName(null);
   }
 
   /** Returns the name of the stage this instruction is executing in. */
@@ -86,4 +98,17 @@ public abstract class NameContext {
    * <p>Examples: "MapElements/Map", 
"BigShuffle.GroupByFirstNBytes/GroupByKey/Reify"
    */
   public abstract @Nullable String userName();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setStageName(String value);
+
+    public abstract Builder setOriginalName(@Nullable String value);
+
+    public abstract Builder setSystemName(@Nullable String value);
+
+    public abstract Builder setUserName(@Nullable String value);
+
+    public abstract NameContext build();
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index 64c97dcac51..cb6cbec7d4b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -39,15 +39,14 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterab
 /** Contains a few of the stage specific fields. E.g. metrics container 
registry, counters etc. */
 @AutoValue
 public abstract class StageInfo {
-  public static StageInfo create(
-      String stageName, String systemName, StreamingDataflowWorker worker) {
-    NameContext nameContext = NameContext.create(stageName, null, systemName, 
null);
+  public static StageInfo create(String stageName, String systemName) {
+    NameContext nameContext = 
NameContext.newBuilder(stageName).setSystemName(systemName).build();
     CounterSet deltaCounters = new CounterSet();
     return new AutoValue_StageInfo(
         stageName,
         systemName,
         StreamingStepMetricsContainer.createRegistry(),
-        new StreamingModeExecutionStateRegistry(worker),
+        new StreamingModeExecutionStateRegistry(),
         deltaCounters,
         deltaCounters.longSum(
             
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
index 01951c2f83e..8990768ed20 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
@@ -131,8 +131,7 @@ public class DataflowExecutionContextTest {
             NameContextsForTests.nameContextForTest(),
             PROCESS_STATE_NAME,
             null,
-            NoopProfileScope.NOOP,
-            null);
+            NoopProfileScope.NOOP);
 
     Closeable closure = tracker.enterState(state);
 
@@ -162,8 +161,7 @@ public class DataflowExecutionContextTest {
             NameContextsForTests.nameContextForTest(),
             PROCESS_STATE_NAME,
             null,
-            NoopProfileScope.NOOP,
-            null);
+            NoopProfileScope.NOOP);
     tracker.enterState(state);
     // Enter a new processing state
     StreamingModeExecutionState newState =
@@ -171,8 +169,7 @@ public class DataflowExecutionContextTest {
             NameContextsForTests.nameContextForTest(),
             PROCESS_STATE_NAME,
             null,
-            NoopProfileScope.NOOP,
-            null);
+            NoopProfileScope.NOOP);
     tracker.enterState(newState);
 
     // The first completed state should be recorded and the new state should 
be active.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 40b22a729cb..18f1f4f7119 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -90,7 +90,7 @@ public class StreamingModeExecutionContextTest {
   @Mock private WindmillStateReader stateReader;
 
   private final StreamingModeExecutionStateRegistry executionStateRegistry =
-      new StreamingModeExecutionStateRegistry(null);
+      new StreamingModeExecutionStateRegistry();
   private StreamingModeExecutionContext executionContext;
   DataflowWorkerHarnessOptions options;
 
@@ -316,11 +316,7 @@ public class StreamingModeExecutionContextTest {
 
     StreamingModeExecutionState state =
         new StreamingModeExecutionState(
-            NameContextsForTests.nameContextForTest(),
-            "testState",
-            null,
-            NoopProfileScope.NOOP,
-            null);
+            NameContextsForTests.nameContextForTest(), "testState", null, 
NoopProfileScope.NOOP);
     ExecutorService executor = Executors.newFixedThreadPool(2);
     AtomicBoolean doneWriting = new AtomicBoolean(false);
 
@@ -359,11 +355,7 @@ public class StreamingModeExecutionContextTest {
     // reach the reading thread.
     StreamingModeExecutionState state =
         new StreamingModeExecutionState(
-            NameContextsForTests.nameContextForTest(),
-            "testState",
-            null,
-            NoopProfileScope.NOOP,
-            null);
+            NameContextsForTests.nameContextForTest(), "testState", null, 
NoopProfileScope.NOOP);
     ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
     try {
       sampler.start();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index eb94d79692b..65cb937c7f0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -574,7 +574,7 @@ public class WorkerCustomSourcesTest {
   public void testReadUnboundedReader() throws Exception {
     CounterSet counterSet = new CounterSet();
     StreamingModeExecutionStateRegistry executionStateRegistry =
-        new StreamingModeExecutionStateRegistry(null);
+        new StreamingModeExecutionStateRegistry();
     ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), 
Runnable::run);
     StreamingModeExecutionContext context =
         new StreamingModeExecutionContext(
@@ -941,7 +941,7 @@ public class WorkerCustomSourcesTest {
   public void testFailedWorkItemsAbort() throws Exception {
     CounterSet counterSet = new CounterSet();
     StreamingModeExecutionStateRegistry executionStateRegistry =
-        new StreamingModeExecutionStateRegistry(null);
+        new StreamingModeExecutionStateRegistry();
     StreamingModeExecutionContext context =
         new StreamingModeExecutionContext(
             counterSet,

Reply via email to