This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa4c99535f Fix to take StreamingDataflowWorkerOptions from external 
options (#30232)
7fa4c99535f is described below

commit 7fa4c99535f4c3ce8b72eae40bd29a288ed98cee
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Feb 6 11:36:00 2024 -0800

    Fix to take StreamingDataflowWorkerOptions from external options (#30232)
    
    ---------
    
    Co-authored-by: Arun Pandian <[email protected]>
---
 .../worker/DataflowBatchWorkerHarness.java         |  2 +-
 .../worker/DataflowWorkerHarnessHelper.java        |  8 +++---
 .../dataflow/worker/StreamingDataflowWorker.java   | 14 +++++-----
 .../worker/WorkerPipelineOptionsFactory.java       | 15 +++++------
 .../worker/DataflowWorkerHarnessHelperTest.java    | 30 +++++++++++++++++++++-
 .../worker/WorkerPipelineOptionsFactoryTest.java   |  4 +--
 6 files changed, 48 insertions(+), 25 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index cc79ac6dbc0..51127c2dc2f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -61,7 +61,7 @@ public class DataflowBatchWorkerHarness {
     
DataflowWorkerHarnessHelper.initializeLogging(DataflowBatchWorkerHarness.class);
     DataflowWorkerHarnessOptions pipelineOptions =
         DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-            DataflowBatchWorkerHarness.class);
+            DataflowBatchWorkerHarness.class, 
DataflowWorkerHarnessOptions.class);
     DataflowBatchWorkerHarness batchHarness = new 
DataflowBatchWorkerHarness(pipelineOptions);
     DataflowWorkerHarnessHelper.configureLogging(pipelineOptions);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index c6d8d727ef4..94c894608a4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -49,11 +49,11 @@ public final class DataflowWorkerHarnessHelper {
   private static final String ROOT_LOGGER_NAME = "";
   private static final String PIPELINE_PATH = "PIPELINE_PATH";
 
-  public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptions(
-      Class<?> workerHarnessClass) throws Exception {
+  public static <T extends DataflowWorkerHarnessOptions> T 
initializeGlobalStateAndPipelineOptions(
+      Class<?> workerHarnessClass, Class<T> harnessOptionsClass) throws 
Exception {
     /* Extract pipeline options. */
-    DataflowWorkerHarnessOptions pipelineOptions =
-        WorkerPipelineOptionsFactory.createFromSystemProperties();
+    T pipelineOptions =
+        
WorkerPipelineOptionsFactory.createFromSystemProperties(harnessOptionsClass);
     pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
 
     /* Configure logging with job-specific properties. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 14efdcc5eb0..463ab953fae 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -64,7 +64,6 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import 
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames;
@@ -472,9 +471,9 @@ public class StreamingDataflowWorker {
     JvmInitializers.runOnStartup();
 
     
DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class);
-    DataflowWorkerHarnessOptions options =
+    StreamingDataflowWorkerOptions options =
         DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-            StreamingDataflowWorker.class);
+            StreamingDataflowWorker.class, 
StreamingDataflowWorkerOptions.class);
     DataflowWorkerHarnessHelper.configureLogging(options);
     checkArgument(
         options.isStreaming(),
@@ -486,8 +485,7 @@ public class StreamingDataflowWorker {
         "%s cannot be main() class with beam_fn_api enabled",
         StreamingDataflowWorker.class.getSimpleName());
 
-    StreamingDataflowWorker worker =
-        StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options);
+    StreamingDataflowWorker worker = 
StreamingDataflowWorker.fromOptions(options);
 
     // Use the MetricsLogger container which is used by BigQueryIO to 
periodically log process-wide
     // metrics.
@@ -506,14 +504,14 @@ public class StreamingDataflowWorker {
     worker.start();
   }
 
-  public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
-      DataflowWorkerHarnessOptions options) throws IOException {
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options)
+      throws IOException {
 
     return new StreamingDataflowWorker(
         Collections.emptyList(),
         IntrinsicMapTaskExecutorFactory.defaultFactory(),
         new DataflowWorkUnitClient(options, LOG),
-        options.as(StreamingDataflowWorkerOptions.class),
+        options,
         true,
         new HotKeyLogger(),
         Instant::now,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
index 0929705c894..a3ec8933c33 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java
@@ -46,17 +46,16 @@ public class WorkerPipelineOptionsFactory {
    * @return A {@link DataflowWorkerHarnessOptions} object configured for the 
Dataflow worker
    *     harness.
    */
-  public static DataflowWorkerHarnessOptions createFromSystemProperties() 
throws IOException {
+  public static <T extends DataflowWorkerHarnessOptions> T 
createFromSystemProperties(
+      Class<T> harnessOptionsClass) throws IOException {
     ObjectMapper objectMapper = new ObjectMapper();
-    DataflowWorkerHarnessOptions options;
+    T options;
     if (System.getProperties().containsKey("sdk_pipeline_options")) {
       // TODO: remove this method of getting pipeline options, once migration 
is complete.
       String serializedOptions = System.getProperty("sdk_pipeline_options");
       LOG.info("Worker harness starting with: {}", serializedOptions);
       options =
-          objectMapper
-              .readValue(serializedOptions, PipelineOptions.class)
-              .as(DataflowWorkerHarnessOptions.class);
+          objectMapper.readValue(serializedOptions, 
PipelineOptions.class).as(harnessOptionsClass);
     } else if 
(System.getProperties().containsKey("sdk_pipeline_options_file")) {
       String filePath = System.getProperty("sdk_pipeline_options_file");
       LOG.info("Loading pipeline options from " + filePath);
@@ -64,12 +63,10 @@ public class WorkerPipelineOptionsFactory {
           new String(Files.readAllBytes(Paths.get(filePath)), 
StandardCharsets.UTF_8);
       LOG.info("Worker harness starting with: " + serializedOptions);
       options =
-          objectMapper
-              .readValue(serializedOptions, PipelineOptions.class)
-              .as(DataflowWorkerHarnessOptions.class);
+          objectMapper.readValue(serializedOptions, 
PipelineOptions.class).as(harnessOptionsClass);
     } else {
       LOG.info("Using empty PipelineOptions, as none were provided.");
-      options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+      options = PipelineOptionsFactory.as(harnessOptionsClass);
     }
 
     // These values will not be known at job submission time and must be 
provided.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
index de9d9cf7d15..c46a112c4e7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelperTest.java
@@ -29,6 +29,7 @@ import java.nio.file.Paths;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
 import 
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
@@ -63,7 +64,7 @@ public class DataflowWorkerHarnessHelperTest {
 
     DataflowWorkerHarnessOptions generatedOptions =
         DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
-            DataflowBatchWorkerHarnessTest.class);
+            DataflowBatchWorkerHarnessTest.class, 
DataflowWorkerHarnessOptions.class);
     // Assert that the returned options are correct.
     assertThat(generatedOptions.getJobId(), equalTo(JOB_ID));
     assertThat(generatedOptions.getWorkerId(), equalTo(WORKER_ID));
@@ -88,4 +89,31 @@ public class DataflowWorkerHarnessHelperTest {
   public void testParseStatusApiDescriptor() throws TextFormat.ParseException {
     assertNull(DataflowWorkerHarnessHelper.getStatusDescriptor());
   }
+
+  @Test
+  public void testStreamingStreamingConfiguration() throws Exception {
+    StreamingDataflowWorkerOptions pipelineOptions =
+        PipelineOptionsFactory.as(StreamingDataflowWorkerOptions.class);
+    pipelineOptions.setJobId(JOB_ID);
+    pipelineOptions.setWorkerId(WORKER_ID);
+    int activeWorkRefreshPeriodMillis = 12345;
+    
pipelineOptions.setActiveWorkRefreshPeriodMillis(activeWorkRefreshPeriodMillis);
+    int stuckCommitDurationMillis = 23456;
+    pipelineOptions.setStuckCommitDurationMillis(stuckCommitDurationMillis);
+    String serializedOptions = new 
ObjectMapper().writeValueAsString(pipelineOptions);
+    File file = tmpFolder.newFile();
+    Files.write(Paths.get(file.getPath()), 
serializedOptions.getBytes(StandardCharsets.UTF_8));
+    System.setProperty("sdk_pipeline_options_file", file.getPath());
+
+    StreamingDataflowWorkerOptions generatedOptions =
+        DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
+            DataflowBatchWorkerHarnessTest.class, 
StreamingDataflowWorkerOptions.class);
+    // Assert that the returned options are correct.
+    assertThat(generatedOptions.getJobId(), equalTo(JOB_ID));
+    assertThat(generatedOptions.getWorkerId(), equalTo(WORKER_ID));
+    assertThat(
+        generatedOptions.getActiveWorkRefreshPeriodMillis(),
+        equalTo(activeWorkRefreshPeriodMillis));
+    assertThat(generatedOptions.getStuckCommitDurationMillis(), 
equalTo(stuckCommitDurationMillis));
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
index f8684edfa2e..62d38d434b8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactoryTest.java
@@ -52,7 +52,7 @@ public class WorkerPipelineOptionsFactoryTest {
 
     @SuppressWarnings("deprecation") // testing deprecated functionality
     DataflowWorkerHarnessOptions options =
-        WorkerPipelineOptionsFactory.createFromSystemProperties();
+        
WorkerPipelineOptionsFactory.createFromSystemProperties(DataflowWorkerHarnessOptions.class);
     assertEquals("test_worker_id", options.getWorkerId());
     assertEquals("test_job_id", options.getJobId());
     assertEquals(999, options.getNumWorkers());
@@ -74,7 +74,7 @@ public class WorkerPipelineOptionsFactoryTest {
 
     @SuppressWarnings("deprecation") // testing deprecated functionality
     DataflowWorkerHarnessOptions options =
-        WorkerPipelineOptionsFactory.createFromSystemProperties();
+        
WorkerPipelineOptionsFactory.createFromSystemProperties(DataflowWorkerHarnessOptions.class);
     assertEquals("test_worker_id_2", options.getWorkerId());
     assertEquals("test_job_id_2", options.getJobId());
     assertEquals(1000, options.getNumWorkers());

Reply via email to