This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 371576a3b17 Remove unused StreamingDataflowWorker parameter (#30256)
371576a3b17 is described below
commit 371576a3b17b940380192378848dd00c55d0cc19
Author: martin trieu <[email protected]>
AuthorDate: Mon Feb 12 06:33:20 2024 -0800
Remove unused StreamingDataflowWorker parameter (#30256)
---
.../dataflow/worker/StreamingDataflowWorker.java | 2 +-
.../worker/StreamingModeExecutionContext.java | 23 ++++++-----------
.../dataflow/worker/counters/NameContext.java | 29 ++++++++++++++++++++--
.../dataflow/worker/streaming/StageInfo.java | 7 +++---
.../worker/DataflowExecutionContextTest.java | 9 +++----
.../worker/StreamingModeExecutionContextTest.java | 14 +++--------
.../dataflow/worker/WorkerCustomSourcesTest.java | 4 +--
7 files changed, 46 insertions(+), 42 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index bca14923cfc..2f9e18cde67 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -991,7 +991,7 @@ public class StreamingDataflowWorker {
StageInfo stageInfo =
stageInfoMap.computeIfAbsent(
- mapTask.getStageName(), s -> StageInfo.create(s,
mapTask.getSystemName(), this));
+ mapTask.getStageName(), s -> StageInfo.create(s,
mapTask.getSystemName()));
ExecutionState executionState = null;
String counterName = "dataflow_source_bytes_processed-" +
mapTask.getSystemName();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 5b18e29293e..2e9e7e608a5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
@@ -440,21 +441,18 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
* needs to be thread safe for multiple writers. A single stage could have
multiple executors
* running concurrently.
*/
- public static class StreamingModeExecutionState
- extends DataflowOperationContext.DataflowExecutionState {
+ public static class StreamingModeExecutionState extends
DataflowExecutionState {
// AtomicLong is used because this value is written in two places:
// 1. The sampling thread calls takeSample to increment the time spent in
this state
// 2. The reporting thread calls extractUpdate which reads the current sum
*AND* sets it to 0.
private final AtomicLong totalMillisInState = new AtomicLong();
- @SuppressWarnings("unused")
public StreamingModeExecutionState(
NameContext nameContext,
String stateName,
MetricsContainer metricsContainer,
- ProfileScope profileScope,
- StreamingDataflowWorker worker) {
+ ProfileScope profileScope) {
// TODO: Take in the requesting step name and side input index for
streaming.
super(nameContext, stateName, null, null, metricsContainer,
profileScope);
}
@@ -492,22 +490,15 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
*/
public static class StreamingModeExecutionStateRegistry extends
DataflowExecutionStateRegistry {
- private final StreamingDataflowWorker worker;
-
- public StreamingModeExecutionStateRegistry(StreamingDataflowWorker worker)
{
- this.worker = worker;
- }
-
@Override
- protected DataflowOperationContext.DataflowExecutionState createState(
+ protected DataflowExecutionState createState(
NameContext nameContext,
String stateName,
String requestingStepName,
Integer inputIndex,
MetricsContainer container,
ProfileScope profileScope) {
- return new StreamingModeExecutionState(
- nameContext, stateName, container, profileScope, worker);
+ return new StreamingModeExecutionState(nameContext, stateName,
container, profileScope);
}
}
@@ -515,14 +506,14 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
private final ExecutionState readState;
private final @Nullable ExecutionStateTracker stateTracker;
- ScopedReadStateSupplier(
+ private ScopedReadStateSupplier(
DataflowOperationContext operationContext, ExecutionStateTracker
stateTracker) {
this.readState = operationContext.newExecutionState("windmill-read");
this.stateTracker = stateTracker;
}
@Override
- public Closeable get() {
+ public @Nullable Closeable get() {
if (stateTracker == null) {
return null;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
index 4f4a1c3834e..a5e231a7cf4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java
@@ -35,7 +35,11 @@ public abstract class NameContext {
@Nullable String originalName,
String systemName,
@Nullable String userName) {
- return new AutoValue_NameContext(stageName, originalName, systemName,
userName);
+ return NameContext.newBuilder(stageName)
+ .setOriginalName(originalName)
+ .setSystemName(systemName)
+ .setUserName(userName)
+ .build();
}
/**
@@ -43,7 +47,15 @@ public abstract class NameContext {
* specific steps..
*/
public static NameContext forStage(String stageName) {
- return new AutoValue_NameContext(stageName, null, null, null);
+ return newBuilder(stageName).build();
+ }
+
+ public static Builder newBuilder(String stageName) {
+ return new AutoValue_NameContext.Builder()
+ .setStageName(stageName)
+ .setUserName(null)
+ .setSystemName(null)
+ .setOriginalName(null);
}
/** Returns the name of the stage this instruction is executing in. */
@@ -86,4 +98,17 @@ public abstract class NameContext {
* <p>Examples: "MapElements/Map",
"BigShuffle.GroupByFirstNBytes/GroupByKey/Reify"
*/
public abstract @Nullable String userName();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setStageName(String value);
+
+ public abstract Builder setOriginalName(@Nullable String value);
+
+ public abstract Builder setSystemName(@Nullable String value);
+
+ public abstract Builder setUserName(@Nullable String value);
+
+ public abstract NameContext build();
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index 64c97dcac51..cb6cbec7d4b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -39,15 +39,14 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterab
/** Contains a few of the stage specific fields. E.g. metrics container
registry, counters etc. */
@AutoValue
public abstract class StageInfo {
- public static StageInfo create(
- String stageName, String systemName, StreamingDataflowWorker worker) {
- NameContext nameContext = NameContext.create(stageName, null, systemName,
null);
+ public static StageInfo create(String stageName, String systemName) {
+ NameContext nameContext =
NameContext.newBuilder(stageName).setSystemName(systemName).build();
CounterSet deltaCounters = new CounterSet();
return new AutoValue_StageInfo(
stageName,
systemName,
StreamingStepMetricsContainer.createRegistry(),
- new StreamingModeExecutionStateRegistry(worker),
+ new StreamingModeExecutionStateRegistry(),
deltaCounters,
deltaCounters.longSum(
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
index 01951c2f83e..8990768ed20 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
@@ -131,8 +131,7 @@ public class DataflowExecutionContextTest {
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
- NoopProfileScope.NOOP,
- null);
+ NoopProfileScope.NOOP);
Closeable closure = tracker.enterState(state);
@@ -162,8 +161,7 @@ public class DataflowExecutionContextTest {
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
- NoopProfileScope.NOOP,
- null);
+ NoopProfileScope.NOOP);
tracker.enterState(state);
// Enter a new processing state
StreamingModeExecutionState newState =
@@ -171,8 +169,7 @@ public class DataflowExecutionContextTest {
NameContextsForTests.nameContextForTest(),
PROCESS_STATE_NAME,
null,
- NoopProfileScope.NOOP,
- null);
+ NoopProfileScope.NOOP);
tracker.enterState(newState);
// The first completed state should be recorded and the new state should
be active.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 40b22a729cb..18f1f4f7119 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -90,7 +90,7 @@ public class StreamingModeExecutionContextTest {
@Mock private WindmillStateReader stateReader;
private final StreamingModeExecutionStateRegistry executionStateRegistry =
- new StreamingModeExecutionStateRegistry(null);
+ new StreamingModeExecutionStateRegistry();
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;
@@ -316,11 +316,7 @@ public class StreamingModeExecutionContextTest {
StreamingModeExecutionState state =
new StreamingModeExecutionState(
- NameContextsForTests.nameContextForTest(),
- "testState",
- null,
- NoopProfileScope.NOOP,
- null);
+ NameContextsForTests.nameContextForTest(), "testState", null,
NoopProfileScope.NOOP);
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicBoolean doneWriting = new AtomicBoolean(false);
@@ -359,11 +355,7 @@ public class StreamingModeExecutionContextTest {
// reach the reading thread.
StreamingModeExecutionState state =
new StreamingModeExecutionState(
- NameContextsForTests.nameContextForTest(),
- "testState",
- null,
- NoopProfileScope.NOOP,
- null);
+ NameContextsForTests.nameContextForTest(), "testState", null,
NoopProfileScope.NOOP);
ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
try {
sampler.start();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index eb94d79692b..65cb937c7f0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -574,7 +574,7 @@ public class WorkerCustomSourcesTest {
public void testReadUnboundedReader() throws Exception {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
- new StreamingModeExecutionStateRegistry(null);
+ new StreamingModeExecutionStateRegistry();
ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1),
Runnable::run);
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
@@ -941,7 +941,7 @@ public class WorkerCustomSourcesTest {
public void testFailedWorkItemsAbort() throws Exception {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
- new StreamingModeExecutionStateRegistry(null);
+ new StreamingModeExecutionStateRegistry();
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
counterSet,