This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e867ed7cdce Worker message plumbing (#29879)
e867ed7cdce is described below
commit e867ed7cdce74beda4cce630173e3c53ee28cc50
Author: Edward Cheng <[email protected]>
AuthorDate: Thu Jan 18 02:47:11 2024 -0800
Worker message plumbing (#29879)
* use StreamingScalingReport for autoscaling signals
* add unit test stub
* spotless apply
* comment test stub
* add unit test
* simplify response processing
* spotless apply
* add more reported metrics
* remove byte metrics
* fix bug
* fix DataflowWorkUnitClient test
* fix DataflowWorkUnitClient test
* formatting
* add check for scheduledtimer
* option to options
* fix timer check
* revert long to int change
* refactor timers
* var type fix
* fix timers refactoring
* use arraylist instead of map
* add timer to list
* fix comment
---------
Co-authored-by: scwhittle <[email protected]>
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
.../dataflow/worker/DataflowWorkUnitClient.java | 42 ++++++++++
.../dataflow/worker/StreamingDataflowWorker.java | 89 ++++++++++++++--------
.../runners/dataflow/worker/WorkUnitClient.java | 17 +++++
.../dataflow/worker/util/BoundedQueueExecutor.java | 4 +-
.../worker/DataflowWorkUnitClientTest.java | 24 ++++++
6 files changed, 145 insertions(+), 33 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index c93af317b1a..a26cfaa457d 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -730,7 +730,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_services_bigquery :
"com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version",
// Keep version consistent with the version in
google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom
google_api_services_cloudresourcemanager :
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version",
- google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
+ google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version",
google_api_services_healthcare :
"com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version",
google_api_services_pubsub :
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
// Keep version consistent with the version in google_cloud_nio,
managed by google_cloud_platform_libraries_bom
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index ffa377fd3f8..bb39e3bd9af 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -31,14 +31,19 @@ import
com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
+import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
+import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -48,6 +53,7 @@ import
org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUp
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -269,4 +275,40 @@ class DataflowWorkUnitClient implements WorkUnitClient {
logger.debug("ReportWorkItemStatus result: {}", state);
return state;
}
+
+ /** Creates WorkerMessage from StreamingScalingReport */
+ @Override
+ public WorkerMessage createWorkerMessageFromStreamingScalingReport(
+ StreamingScalingReport report) {
+ DateTime endTime = DateTime.now();
+ logger.debug("Reporting WorkMessageResponse");
+ Map<String, String> labels =
+ ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID",
options.getWorkerId());
+ WorkerMessage msg =
+ new WorkerMessage()
+ .setTime(toCloudTime(endTime))
+ .setStreamingScalingReport(report)
+ .setLabels(labels);
+ return msg;
+ }
+
+ /** Reports the autoscaling signals to dataflow */
+ @Override
+ public void reportWorkerMessage(WorkerMessage msg) throws IOException {
+ SendWorkerMessagesRequest request =
+ new SendWorkerMessagesRequest()
+ .setLocation(options.getRegion())
+ .setWorkerMessages(Collections.singletonList(msg));
+ SendWorkerMessagesResponse result =
+ dataflow
+ .projects()
+ .locations()
+ .workerMessages(options.getProject(), options.getRegion(), request)
+ .execute();
+ if (result == null) {
+ logger.warn("Worker Message response is null");
+ throw new IOException("Got null Worker Message response");
+ }
+ // Currently no response is expected
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f68e5ba26c7..c9a00ade6d8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -26,6 +26,7 @@ import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.StreamingComputationConfig;
import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -276,13 +277,12 @@ public class StreamingDataflowWorker {
private final HotKeyLogger hotKeyLogger;
// Periodic sender of debug information to the debug capture service.
private final DebugCapture.@Nullable Manager debugCaptureManager;
- private ScheduledExecutorService refreshWorkTimer;
- private ScheduledExecutorService statusPageTimer;
- private ScheduledExecutorService globalWorkerUpdatesTimer;
+ // Collection of ScheduledExecutorServices that are running periodic
functions.
+ private ArrayList<ScheduledExecutorService> scheduledExecutors =
+ new ArrayList<ScheduledExecutorService>();
private int retryLocallyDelayMs = 10000;
// Periodically fires a global config request to dataflow service. Only used
when windmill service
// is enabled.
- private ScheduledExecutorService globalConfigRefreshTimer;
// Possibly overridden by streaming engine config.
private int maxWorkItemCommitBytes = Integer.MAX_VALUE;
@@ -579,14 +579,25 @@ public class StreamingDataflowWorker {
sampler.start();
// Periodically report workers counters and other updates.
- globalWorkerUpdatesTimer =
executorSupplier.apply("GlobalWorkerUpdatesTimer");
- globalWorkerUpdatesTimer.scheduleWithFixedDelay(
+ ScheduledExecutorService workerUpdateTimer =
executorSupplier.apply("GlobalWorkerUpdates");
+ workerUpdateTimer.scheduleWithFixedDelay(
this::reportPeriodicWorkerUpdates,
0,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
TimeUnit.MILLISECONDS);
+ scheduledExecutors.add(workerUpdateTimer);
+
+ ScheduledExecutorService workerMessageTimer =
executorSupplier.apply("ReportWorkerMessage");
+ if (options.getWindmillHarnessUpdateReportingPeriod().getMillis() > 0) {
+ workerMessageTimer.scheduleWithFixedDelay(
+ this::reportPeriodicWorkerMessage,
+ 0,
+ options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
+ TimeUnit.MILLISECONDS);
+ scheduledExecutors.add(workerMessageTimer);
+ }
- refreshWorkTimer = executorSupplier.apply("RefreshWork");
+ ScheduledExecutorService refreshWorkTimer =
executorSupplier.apply("RefreshWork");
if (options.getActiveWorkRefreshPeriodMillis() > 0) {
refreshWorkTimer.scheduleWithFixedDelay(
new Runnable() {
@@ -602,15 +613,17 @@ public class StreamingDataflowWorker {
options.getActiveWorkRefreshPeriodMillis(),
options.getActiveWorkRefreshPeriodMillis(),
TimeUnit.MILLISECONDS);
+ scheduledExecutors.add(refreshWorkTimer);
}
if (windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0) {
int periodMillis = Math.max(options.getStuckCommitDurationMillis() / 10,
100);
refreshWorkTimer.scheduleWithFixedDelay(
this::invalidateStuckCommits, periodMillis, periodMillis,
TimeUnit.MILLISECONDS);
+ scheduledExecutors.add(refreshWorkTimer);
}
if (options.getPeriodicStatusPageOutputDirectory() != null) {
- statusPageTimer = executorSupplier.apply("DumpStatusPages");
+ ScheduledExecutorService statusPageTimer =
executorSupplier.apply("DumpStatusPages");
statusPageTimer.scheduleWithFixedDelay(
() -> {
Collection<Capturable> pages = statusPages.getDebugCapturePages();
@@ -645,6 +658,7 @@ public class StreamingDataflowWorker {
60,
60,
TimeUnit.SECONDS);
+ scheduledExecutors.add(statusPageTimer);
}
reportHarnessStartup();
@@ -676,25 +690,15 @@ public class StreamingDataflowWorker {
public void stop() {
try {
- if (globalConfigRefreshTimer != null) {
- globalConfigRefreshTimer.shutdown();
- }
- globalWorkerUpdatesTimer.shutdown();
- if (refreshWorkTimer != null) {
- refreshWorkTimer.shutdown();
- }
- if (statusPageTimer != null) {
- statusPageTimer.shutdown();
- }
- if (globalConfigRefreshTimer != null) {
- globalConfigRefreshTimer.awaitTermination(300, TimeUnit.SECONDS);
- }
- globalWorkerUpdatesTimer.awaitTermination(300, TimeUnit.SECONDS);
- if (refreshWorkTimer != null) {
- refreshWorkTimer.awaitTermination(300, TimeUnit.SECONDS);
+ for (ScheduledExecutorService timer : scheduledExecutors) {
+ if (timer != null) {
+ timer.shutdown();
+ }
}
- if (statusPageTimer != null) {
- statusPageTimer.awaitTermination(300, TimeUnit.SECONDS);
+ for (ScheduledExecutorService timer : scheduledExecutors) {
+ if (timer != null) {
+ timer.awaitTermination(300, TimeUnit.SECONDS);
+ }
}
statusPages.stop();
if (debugCaptureManager != null) {
@@ -716,6 +720,7 @@ public class StreamingDataflowWorker {
// one last send
reportPeriodicWorkerUpdates();
+ reportPeriodicWorkerMessage();
} catch (Exception e) {
LOG.warn("Exception while shutting down: ", e);
}
@@ -1584,12 +1589,14 @@ public class StreamingDataflowWorker {
LOG.info("windmillServerStub is now ready");
// Now start a thread that periodically refreshes the windmill service
endpoint.
- globalConfigRefreshTimer =
executorSupplier.apply("GlobalConfigRefreshTimer");
- globalConfigRefreshTimer.scheduleWithFixedDelay(
+ ScheduledExecutorService configRefreshTimer =
+ executorSupplier.apply("GlobalConfigRefreshTimer");
+ configRefreshTimer.scheduleWithFixedDelay(
this::getGlobalConfig,
0,
options.getGlobalConfigRefreshPeriod().getMillis(),
TimeUnit.MILLISECONDS);
+ scheduledExecutors.add(configRefreshTimer);
}
private void getGlobalConfig() {
@@ -1742,9 +1749,20 @@ public class StreamingDataflowWorker {
maxOutstandingBytes.getAndReset();
maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
outstandingBundles.getAndReset();
- outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
+ outstandingBundles.addValue((long) workUnitExecutor.elementsOutstanding());
maxOutstandingBundles.getAndReset();
-
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
+ maxOutstandingBundles.addValue((long)
workUnitExecutor.maximumElementsOutstanding());
+ }
+
+ private void sendWorkerMessage() throws IOException {
+ StreamingScalingReport activeThreadsReport =
+ new StreamingScalingReport()
+ .setActiveThreadCount(workUnitExecutor.activeCount())
+ .setActiveBundleCount(workUnitExecutor.elementsOutstanding())
+ .setMaximumThreadCount(chooseMaximumNumberOfThreads())
+
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding());
+ workUnitClient.reportWorkerMessage(
+
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
}
@VisibleForTesting
@@ -1760,6 +1778,17 @@ public class StreamingDataflowWorker {
}
}
+ @VisibleForTesting
+ public void reportPeriodicWorkerMessage() {
+ try {
+ sendWorkerMessage();
+ } catch (IOException e) {
+ LOG.warn("Failed to send worker messages", e);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while trying to send worker messages",
e);
+ }
+ }
+
/**
* Returns key for a counter update. It is a String in case of legacy
counter and
* CounterStructuredName in the case of a structured counter.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
index 82fbcd82c13..6f26f404807 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.Optional;
@@ -49,4 +51,19 @@ interface WorkUnitClient {
* @return a {@link WorkItemServiceState} (e.g. a new stop position)
*/
WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus)
throws IOException;
+
+ /**
+ * Creates a {@link WorkerMessage} containing the given Streaming Scaling
Report
+ *
+ * @param report the StreamingScalingReport containing autoscaling metrics
+ * @return a {@link WorkerMessage}
+ */
+ WorkerMessage
createWorkerMessageFromStreamingScalingReport(StreamingScalingReport report);
+
+ /**
+ * Reports the autoscaling signals with a {@link StreamingScalingReport}.
+ *
+ * @param msg the WorkerMessage to report
+ */
+ void reportWorkerMessage(WorkerMessage msg) throws IOException;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index dcff1f73f10..cd4c727e310 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -127,7 +127,7 @@ public class BoundedQueueExecutor {
return bytesOutstanding;
}
- public long elementsOutstanding() {
+ public int elementsOutstanding() {
return elementsOutstanding;
}
@@ -135,7 +135,7 @@ public class BoundedQueueExecutor {
return maximumBytesOutstanding;
}
- public long maximumElementsOutstanding() {
+ public int maximumElementsOutstanding() {
return maximumElementsOutstanding;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 5329fb0f601..7720de3563b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -31,8 +31,12 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
+import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
import com.google.api.services.dataflow.model.SeqMapTask;
+import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
+import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -227,6 +231,26 @@ public class DataflowWorkUnitClientTest {
client.getWorkItem();
}
+ @Test
+ public void testReportWorkerMessage() throws Exception {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setContentType(Json.MEDIA_TYPE);
+ SendWorkerMessagesResponse workerMessage = new
SendWorkerMessagesResponse();
+ workerMessage.setFactory(Transport.getJsonFactory());
+ response.setContent(workerMessage.toPrettyString());
+ when(request.execute()).thenReturn(response);
+ StreamingScalingReport activeThreadsReport =
+ new StreamingScalingReport().setActiveThreadCount(1);
+ WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+ WorkerMessage msg =
client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
+ client.reportWorkerMessage(msg);
+
+ SendWorkerMessagesRequest actualRequest =
+ Transport.getJsonFactory()
+ .fromString(request.getContentAsString(),
SendWorkerMessagesRequest.class);
+ assertEquals(ImmutableList.of(msg), actualRequest.getWorkerMessages());
+ }
+
private LowLevelHttpResponse generateMockResponse(WorkItem... workItems)
throws Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);