This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7fa4c99535f Fix to take StreamingDataflowWorkerOptions from external
options (#30232)
7fa4c99535f is described below
commit 7fa4c99535f4c3ce8b72eae40bd29a288ed98cee
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Feb 6 11:36:00 2024 -0800
Fix to take StreamingDataflowWorkerOptions from external options (#30232)
---------
Co-authored-by: Arun Pandian <[email protected]>
---
.../worker/DataflowBatchWorkerHarness.java | 2 +-
.../worker/DataflowWorkerHarnessHelper.java | 8 +++---
.../dataflow/worker/StreamingDataflowWorker.java | 14 +++++-----
.../worker/WorkerPipelineOptionsFactory.java | 15 +++++------
.../worker/DataflowWorkerHarnessHelperTest.java | 30 +++++++++++++++++++++-
.../worker/WorkerPipelineOptionsFactoryTest.java | 4 +--
6 files changed, 48 insertions(+), 25 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index cc79ac6dbc0..51127c2dc2f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -61,7 +61,7 @@ public class DataflowBatchWorkerHarness {
DataflowWorkerHarnessHelper.initializeLogging(DataflowBatchWorkerHarness.class);
DataflowWorkerHarnessOptions pipelineOptions =
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
- DataflowBatchWorkerHarness.class);
+ DataflowBatchWorkerHarness.class,
DataflowWorkerHarnessOptions.class);
DataflowBatchWorkerHarness batchHarness = new
DataflowBatchWorkerHarness(pipelineOptions);
DataflowWorkerHarnessHelper.configureLogging(pipelineOptions);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index c6d8d727ef4..94c894608a4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -49,11 +49,11 @@ public final class DataflowWorkerHarnessHelper {
private static final String ROOT_LOGGER_NAME = "";
private static final String PIPELINE_PATH = "PIPELINE_PATH";
- public static DataflowWorkerHarnessOptions
initializeGlobalStateAndPipelineOptions(
- Class<?> workerHarnessClass) throws Exception {
+ public static <T extends DataflowWorkerHarnessOptions> T
initializeGlobalStateAndPipelineOptions(
+ Class<?> workerHarnessClass, Class<T> harnessOptionsClass) throws
Exception {
/* Extract pipeline options. */
- DataflowWorkerHarnessOptions pipelineOptions =
- WorkerPipelineOptionsFactory.createFromSystemProperties();
+ T pipelineOptions =
+
WorkerPipelineOptionsFactory.createFromSystemProperties(harnessOptionsClass);
pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
/* Configure logging with job-specific properties. */
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 14efdcc5eb0..463ab953fae 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -64,7 +64,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.CustomSources;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames;
@@ -472,9 +471,9 @@ public class StreamingDataflowWorker {
JvmInitializers.runOnStartup();
DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class);
- DataflowWorkerHarnessOptions options =
+ StreamingDataflowWorkerOptions options =
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
- StreamingDataflowWorker.class);
+ StreamingDataflowWorker.class,
StreamingDataflowWorkerOptions.class);
DataflowWorkerHarnessHelper.configureLogging(options);
checkArgument(
options.isStreaming(),
@@ -486,8 +485,7 @@ public class StreamingDataflowWorker {
"%s cannot be main() class with beam_fn_api enabled",
StreamingDataflowWorker.class.getSimpleName());
- StreamingDataflowWorker worker =
- StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options);
+ StreamingDataflowWorker worker =
StreamingDataflowWorker.fromOptions(options);
// Use the MetricsLogger container which is used by BigQueryIO to
periodically log process-wide
// metrics.
@@ -506,14 +504,14 @@ public class StreamingDataflowWorker {
worker.start();
}
- public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
- DataflowWorkerHarnessOptions options) throws IOException {
+ public static StreamingDataflowWorker
fromOptions(StreamingDataflowWorkerOptions options)
+ throws IOException {
return new StreamingDataflowWorker(
Collections.emptyList(),
IntrinsicMapTaskExecutorFactory.defaultFactory(),
new DataflowWorkUnitClient(options, LOG),
- options.as(StreamingDataflowWorkerOptions.class),
+ options,
true,
new HotKeyLogger(),
Instant::now,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
index 0929705c894..a3ec8933c33 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
@@ -46,17 +46,16 @@ public class WorkerPipelineOptionsFactory {
* @return A {@link DataflowWorkerHarnessOptions} object configured for the
Dataflow worker
* harness.
*/
- public static DataflowWorkerHarnessOptions createFromSystemProperties()
throws IOException {
+ public static <T extends DataflowWorkerHarnessOptions> T
createFromSystemProperties(
+ Class<T> harnessOptionsClass) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
- DataflowWorkerHarnessOptions options;
+ T options;
if (System.getProperties().containsKey("sdk_pipeline_options")) {
// TODO: remove this method of getting pipeline options, once migration
is complete.
String serializedOptions = System.getProperty("sdk_pipeline_options");
LOG.info("Worker harness starting with: {}", serializedOptions);
options =
- objectMapper
- .readValue(serializedOptions, PipelineOptions.class)
- .as(DataflowWorkerHarnessOptions.class);
+ objectMapper.readValue(serializedOptions,
PipelineOptions.class).as(harnessOptionsClass);
} else if
(System.getProperties().containsKey("sdk_pipeline_options_file")) {
String filePath = System.getProperty("sdk_pipeline_options_file");
LOG.info("Loading pipeline options from " + filePath);
@@ -64,12 +63,10 @@ public class WorkerPipelineOptionsFactory {
new String(Files.readAllBytes(Paths.get(filePath)),
StandardCharsets.UTF_8);
LOG.info("Worker harness starting with: " + serializedOptions);
options =
- objectMapper
- .readValue(serializedOptions, PipelineOptions.class)
- .as(DataflowWorkerHarnessOptions.class);
+ objectMapper.readValue(serializedOptions,
PipelineOptions.class).as(harnessOptionsClass);
} else {
LOG.info("Using empty PipelineOptions, as none were provided.");
- options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+ options = PipelineOptionsFactory.as(harnessOptionsClass);
}
// These values will not be known at job submission time and must be
provided.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
index de9d9cf7d15..c46a112c4e7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
@@ -29,6 +29,7 @@ import java.nio.file.Paths;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
import
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
@@ -63,7 +64,7 @@ public class DataflowWorkerHarnessHelperTest {
DataflowWorkerHarnessOptions generatedOptions =
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
- DataflowBatchWorkerHarnessTest.class);
+ DataflowBatchWorkerHarnessTest.class,
DataflowWorkerHarnessOptions.class);
// Assert that the returned options are correct.
assertThat(generatedOptions.getJobId(), equalTo(JOB_ID));
assertThat(generatedOptions.getWorkerId(), equalTo(WORKER_ID));
@@ -88,4 +89,31 @@ public class DataflowWorkerHarnessHelperTest {
public void testParseStatusApiDescriptor() throws TextFormat.ParseException {
assertNull(DataflowWorkerHarnessHelper.getStatusDescriptor());
}
+
+ @Test
+ public void testStreamingStreamingConfiguration() throws Exception {
+ StreamingDataflowWorkerOptions pipelineOptions =
+ PipelineOptionsFactory.as(StreamingDataflowWorkerOptions.class);
+ pipelineOptions.setJobId(JOB_ID);
+ pipelineOptions.setWorkerId(WORKER_ID);
+ int activeWorkRefreshPeriodMillis = 12345;
+
pipelineOptions.setActiveWorkRefreshPeriodMillis(activeWorkRefreshPeriodMillis);
+ int stuckCommitDurationMillis = 23456;
+ pipelineOptions.setStuckCommitDurationMillis(stuckCommitDurationMillis);
+ String serializedOptions = new
ObjectMapper().writeValueAsString(pipelineOptions);
+ File file = tmpFolder.newFile();
+ Files.write(Paths.get(file.getPath()),
serializedOptions.getBytes(StandardCharsets.UTF_8));
+ System.setProperty("sdk_pipeline_options_file", file.getPath());
+
+ StreamingDataflowWorkerOptions generatedOptions =
+ DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
+ DataflowBatchWorkerHarnessTest.class,
StreamingDataflowWorkerOptions.class);
+ // Assert that the returned options are correct.
+ assertThat(generatedOptions.getJobId(), equalTo(JOB_ID));
+ assertThat(generatedOptions.getWorkerId(), equalTo(WORKER_ID));
+ assertThat(
+ generatedOptions.getActiveWorkRefreshPeriodMillis(),
+ equalTo(activeWorkRefreshPeriodMillis));
+ assertThat(generatedOptions.getStuckCommitDurationMillis(),
equalTo(stuckCommitDurationMillis));
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
index f8684edfa2e..62d38d434b8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
@@ -52,7 +52,7 @@ public class WorkerPipelineOptionsFactoryTest {
@SuppressWarnings("deprecation") // testing deprecated functionality
DataflowWorkerHarnessOptions options =
- WorkerPipelineOptionsFactory.createFromSystemProperties();
+
WorkerPipelineOptionsFactory.createFromSystemProperties(DataflowWorkerHarnessOptions.class);
assertEquals("test_worker_id", options.getWorkerId());
assertEquals("test_job_id", options.getJobId());
assertEquals(999, options.getNumWorkers());
@@ -74,7 +74,7 @@ public class WorkerPipelineOptionsFactoryTest {
@SuppressWarnings("deprecation") // testing deprecated functionality
DataflowWorkerHarnessOptions options =
- WorkerPipelineOptionsFactory.createFromSystemProperties();
+
WorkerPipelineOptionsFactory.createFromSystemProperties(DataflowWorkerHarnessOptions.class);
assertEquals("test_worker_id_2", options.getWorkerId());
assertEquals("test_job_id_2", options.getJobId());
assertEquals(1000, options.getNumWorkers());