This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e867ed7cdce Worker message plumbing (#29879)
e867ed7cdce is described below

commit e867ed7cdce74beda4cce630173e3c53ee28cc50
Author: Edward Cheng <[email protected]>
AuthorDate: Thu Jan 18 02:47:11 2024 -0800

    Worker message plumbing (#29879)
    
    * use StreamingScalingReport for autoscaling signals
    
    * add unit test stub
    
    * spotless apply
    
    * comment test stub
    
    * add unit test
    
    * simplify response processing
    
    * spotless apply
    
    * add more reported metrics
    
    * remove byte metrics
    
    * fix bug
    
    * fix DataflowWorkUnitClient test
    
    * fix DataflowWorkUnitClient test
    
    * formatting
    
    * add check for scheduledtimer
    
    * option to options
    
    * fix timer check
    
    * revert long to int change
    
    * refactor timers
    
    * var type fix
    
    * fix timers refactoring
    
    * use arraylist instead of map
    
    * add timer to list
    
    * fix comment
    
    ---------
    
    Co-authored-by: scwhittle <[email protected]>
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |  2 +-
 .../dataflow/worker/DataflowWorkUnitClient.java    | 42 ++++++++++
 .../dataflow/worker/StreamingDataflowWorker.java   | 89 ++++++++++++++--------
 .../runners/dataflow/worker/WorkUnitClient.java    | 17 +++++
 .../dataflow/worker/util/BoundedQueueExecutor.java |  4 +-
 .../worker/DataflowWorkUnitClientTest.java         | 24 ++++++
 6 files changed, 145 insertions(+), 33 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index c93af317b1a..a26cfaa457d 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -730,7 +730,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version",
         // Keep version consistent with the version in 
google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom
         google_api_services_cloudresourcemanager    : 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version",
-        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
+        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version",
         google_api_services_healthcare              : 
"com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version",
         google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
         // Keep version consistent with the version in google_cloud_nio, 
managed by google_cloud_platform_libraries_bom
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index ffa377fd3f8..bb39e3bd9af 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -31,14 +31,19 @@ import 
com.google.api.services.dataflow.model.LeaseWorkItemRequest;
 import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
+import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
+import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
 import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -48,6 +53,7 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUp
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -269,4 +275,40 @@ class DataflowWorkUnitClient implements WorkUnitClient {
     logger.debug("ReportWorkItemStatus result: {}", state);
     return state;
   }
+
+  /** Creates WorkerMessage from StreamingScalingReport */
+  @Override
+  public WorkerMessage createWorkerMessageFromStreamingScalingReport(
+      StreamingScalingReport report) {
+    DateTime endTime = DateTime.now();
+    logger.debug("Reporting WorkMessageResponse");
+    Map<String, String> labels =
+        ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID", 
options.getWorkerId());
+    WorkerMessage msg =
+        new WorkerMessage()
+            .setTime(toCloudTime(endTime))
+            .setStreamingScalingReport(report)
+            .setLabels(labels);
+    return msg;
+  }
+
+  /** Reports the autoscaling signals to dataflow */
+  @Override
+  public void reportWorkerMessage(WorkerMessage msg) throws IOException {
+    SendWorkerMessagesRequest request =
+        new SendWorkerMessagesRequest()
+            .setLocation(options.getRegion())
+            .setWorkerMessages(Collections.singletonList(msg));
+    SendWorkerMessagesResponse result =
+        dataflow
+            .projects()
+            .locations()
+            .workerMessages(options.getProject(), options.getRegion(), request)
+            .execute();
+    if (result == null) {
+      logger.warn("Worker Message response is null");
+      throw new IOException("Got null Worker Message response");
+    }
+    // Currently no response is expected
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f68e5ba26c7..c9a00ade6d8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -26,6 +26,7 @@ import com.google.api.services.dataflow.model.MapTask;
 import com.google.api.services.dataflow.model.Status;
 import com.google.api.services.dataflow.model.StreamingComputationConfig;
 import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemStatus;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -276,13 +277,12 @@ public class StreamingDataflowWorker {
   private final HotKeyLogger hotKeyLogger;
   // Periodic sender of debug information to the debug capture service.
   private final DebugCapture.@Nullable Manager debugCaptureManager;
-  private ScheduledExecutorService refreshWorkTimer;
-  private ScheduledExecutorService statusPageTimer;
-  private ScheduledExecutorService globalWorkerUpdatesTimer;
+  // Collection of ScheduledExecutorServices that are running periodic 
functions.
+  private ArrayList<ScheduledExecutorService> scheduledExecutors =
+      new ArrayList<ScheduledExecutorService>();
   private int retryLocallyDelayMs = 10000;
   // Periodically fires a global config request to dataflow service. Only used 
when windmill service
   // is enabled.
-  private ScheduledExecutorService globalConfigRefreshTimer;
   // Possibly overridden by streaming engine config.
   private int maxWorkItemCommitBytes = Integer.MAX_VALUE;
 
@@ -579,14 +579,25 @@ public class StreamingDataflowWorker {
     sampler.start();
 
     // Periodically report workers counters and other updates.
-    globalWorkerUpdatesTimer = 
executorSupplier.apply("GlobalWorkerUpdatesTimer");
-    globalWorkerUpdatesTimer.scheduleWithFixedDelay(
+    ScheduledExecutorService workerUpdateTimer = 
executorSupplier.apply("GlobalWorkerUpdates");
+    workerUpdateTimer.scheduleWithFixedDelay(
         this::reportPeriodicWorkerUpdates,
         0,
         options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
         TimeUnit.MILLISECONDS);
+    scheduledExecutors.add(workerUpdateTimer);
+
+    ScheduledExecutorService workerMessageTimer = 
executorSupplier.apply("ReportWorkerMessage");
+    if (options.getWindmillHarnessUpdateReportingPeriod().getMillis() > 0) {
+      workerMessageTimer.scheduleWithFixedDelay(
+          this::reportPeriodicWorkerMessage,
+          0,
+          options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
+          TimeUnit.MILLISECONDS);
+      scheduledExecutors.add(workerMessageTimer);
+    }
 
-    refreshWorkTimer = executorSupplier.apply("RefreshWork");
+    ScheduledExecutorService refreshWorkTimer = 
executorSupplier.apply("RefreshWork");
     if (options.getActiveWorkRefreshPeriodMillis() > 0) {
       refreshWorkTimer.scheduleWithFixedDelay(
           new Runnable() {
@@ -602,15 +613,17 @@ public class StreamingDataflowWorker {
           options.getActiveWorkRefreshPeriodMillis(),
           options.getActiveWorkRefreshPeriodMillis(),
           TimeUnit.MILLISECONDS);
+      scheduledExecutors.add(refreshWorkTimer);
     }
     if (windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0) {
       int periodMillis = Math.max(options.getStuckCommitDurationMillis() / 10, 
100);
       refreshWorkTimer.scheduleWithFixedDelay(
           this::invalidateStuckCommits, periodMillis, periodMillis, 
TimeUnit.MILLISECONDS);
+      scheduledExecutors.add(refreshWorkTimer);
     }
 
     if (options.getPeriodicStatusPageOutputDirectory() != null) {
-      statusPageTimer = executorSupplier.apply("DumpStatusPages");
+      ScheduledExecutorService statusPageTimer = 
executorSupplier.apply("DumpStatusPages");
       statusPageTimer.scheduleWithFixedDelay(
           () -> {
             Collection<Capturable> pages = statusPages.getDebugCapturePages();
@@ -645,6 +658,7 @@ public class StreamingDataflowWorker {
           60,
           60,
           TimeUnit.SECONDS);
+      scheduledExecutors.add(statusPageTimer);
     }
 
     reportHarnessStartup();
@@ -676,25 +690,15 @@ public class StreamingDataflowWorker {
 
   public void stop() {
     try {
-      if (globalConfigRefreshTimer != null) {
-        globalConfigRefreshTimer.shutdown();
-      }
-      globalWorkerUpdatesTimer.shutdown();
-      if (refreshWorkTimer != null) {
-        refreshWorkTimer.shutdown();
-      }
-      if (statusPageTimer != null) {
-        statusPageTimer.shutdown();
-      }
-      if (globalConfigRefreshTimer != null) {
-        globalConfigRefreshTimer.awaitTermination(300, TimeUnit.SECONDS);
-      }
-      globalWorkerUpdatesTimer.awaitTermination(300, TimeUnit.SECONDS);
-      if (refreshWorkTimer != null) {
-        refreshWorkTimer.awaitTermination(300, TimeUnit.SECONDS);
+      for (ScheduledExecutorService timer : scheduledExecutors) {
+        if (timer != null) {
+          timer.shutdown();
+        }
       }
-      if (statusPageTimer != null) {
-        statusPageTimer.awaitTermination(300, TimeUnit.SECONDS);
+      for (ScheduledExecutorService timer : scheduledExecutors) {
+        if (timer != null) {
+          timer.awaitTermination(300, TimeUnit.SECONDS);
+        }
       }
       statusPages.stop();
       if (debugCaptureManager != null) {
@@ -716,6 +720,7 @@ public class StreamingDataflowWorker {
 
       // one last send
       reportPeriodicWorkerUpdates();
+      reportPeriodicWorkerMessage();
     } catch (Exception e) {
       LOG.warn("Exception while shutting down: ", e);
     }
@@ -1584,12 +1589,14 @@ public class StreamingDataflowWorker {
     LOG.info("windmillServerStub is now ready");
 
     // Now start a thread that periodically refreshes the windmill service 
endpoint.
-    globalConfigRefreshTimer = 
executorSupplier.apply("GlobalConfigRefreshTimer");
-    globalConfigRefreshTimer.scheduleWithFixedDelay(
+    ScheduledExecutorService configRefreshTimer =
+        executorSupplier.apply("GlobalConfigRefreshTimer");
+    configRefreshTimer.scheduleWithFixedDelay(
         this::getGlobalConfig,
         0,
         options.getGlobalConfigRefreshPeriod().getMillis(),
         TimeUnit.MILLISECONDS);
+    scheduledExecutors.add(configRefreshTimer);
   }
 
   private void getGlobalConfig() {
@@ -1742,9 +1749,20 @@ public class StreamingDataflowWorker {
     maxOutstandingBytes.getAndReset();
     maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
     outstandingBundles.getAndReset();
-    outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
+    outstandingBundles.addValue((long) workUnitExecutor.elementsOutstanding());
     maxOutstandingBundles.getAndReset();
-    
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
+    maxOutstandingBundles.addValue((long) 
workUnitExecutor.maximumElementsOutstanding());
+  }
+
+  private void sendWorkerMessage() throws IOException {
+    StreamingScalingReport activeThreadsReport =
+        new StreamingScalingReport()
+            .setActiveThreadCount(workUnitExecutor.activeCount())
+            .setActiveBundleCount(workUnitExecutor.elementsOutstanding())
+            .setMaximumThreadCount(chooseMaximumNumberOfThreads())
+            
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding());
+    workUnitClient.reportWorkerMessage(
+        
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
   }
 
   @VisibleForTesting
@@ -1760,6 +1778,17 @@ public class StreamingDataflowWorker {
     }
   }
 
+  @VisibleForTesting
+  public void reportPeriodicWorkerMessage() {
+    try {
+      sendWorkerMessage();
+    } catch (IOException e) {
+      LOG.warn("Failed to send worker messages", e);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while trying to send worker messages", 
e);
+    }
+  }
+
   /**
    * Returns key for a counter update. It is a String in case of legacy 
counter and
    * CounterStructuredName in the case of a structured counter.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
index 82fbcd82c13..6f26f404807 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
+import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
 import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -49,4 +51,19 @@ interface WorkUnitClient {
    * @return a {@link WorkItemServiceState} (e.g. a new stop position)
    */
   WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus) 
throws IOException;
+
+  /**
+   * Creates a {@link WorkerMessage} containing the given Streaming Scaling 
Report
+   *
+   * @param report the StreamingScalingReport containing autoscaling metrics
+   * @return a {@link WorkerMessage}
+   */
+  WorkerMessage 
createWorkerMessageFromStreamingScalingReport(StreamingScalingReport report);
+
+  /**
+   * Reports the autoscaling signals with a {@link StreamingScalingReport}.
+   *
+   * @param msg the WorkerMessage to report
+   */
+  void reportWorkerMessage(WorkerMessage msg) throws IOException;
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index dcff1f73f10..cd4c727e310 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -127,7 +127,7 @@ public class BoundedQueueExecutor {
     return bytesOutstanding;
   }
 
-  public long elementsOutstanding() {
+  public int elementsOutstanding() {
     return elementsOutstanding;
   }
 
@@ -135,7 +135,7 @@ public class BoundedQueueExecutor {
     return maximumBytesOutstanding;
   }
 
-  public long maximumElementsOutstanding() {
+  public int maximumElementsOutstanding() {
     return maximumElementsOutstanding;
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 5329fb0f601..7720de3563b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -31,8 +31,12 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
 import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
 import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
+import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
 import com.google.api.services.dataflow.model.SeqMapTask;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
+import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
 import java.util.Optional;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -227,6 +231,26 @@ public class DataflowWorkUnitClientTest {
     client.getWorkItem();
   }
 
+  @Test
+  public void testReportWorkerMessage() throws Exception {
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    SendWorkerMessagesResponse workerMessage = new 
SendWorkerMessagesResponse();
+    workerMessage.setFactory(Transport.getJsonFactory());
+    response.setContent(workerMessage.toPrettyString());
+    when(request.execute()).thenReturn(response);
+    StreamingScalingReport activeThreadsReport =
+        new StreamingScalingReport().setActiveThreadCount(1);
+    WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+    WorkerMessage msg = 
client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
+    client.reportWorkerMessage(msg);
+
+    SendWorkerMessagesRequest actualRequest =
+        Transport.getJsonFactory()
+            .fromString(request.getContentAsString(), 
SendWorkerMessagesRequest.class);
+    assertEquals(ImmutableList.of(msg), actualRequest.getWorkerMessages());
+  }
+
   private LowLevelHttpResponse generateMockResponse(WorkItem... workItems) 
throws Exception {
     MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
     response.setContentType(Json.MEDIA_TYPE);

Reply via email to