This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch release-2.59.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.59.0 by this push:
new b552e0fd026 Check experiment for hotkey logging (#32285) (#32290)
b552e0fd026 is described below
commit b552e0fd026113b2270b1cafc2e39b2978fc009d
Author: Danny McCormick <[email protected]>
AuthorDate: Thu Aug 22 20:47:01 2024 +0100
Check experiment for hotkey logging (#32285) (#32290)
* Check experiment for hotkey logging
* Spotless
---
.../org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java | 2 +-
.../beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java | 3 ++-
.../worker/windmill/work/processing/StreamingWorkScheduler.java | 5 ++++-
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 1fedcd8f3a2..c01096716c9 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -423,7 +423,7 @@ public class DataflowPipelineTranslator {
if (options.getDataflowKmsKey() != null) {
environment.setServiceKmsKeyName(options.getDataflowKmsKey());
}
- if (options.isHotKeyLoggingEnabled()) {
+ if (options.isHotKeyLoggingEnabled() || hasExperiment(options,
"enable_hot_key_logging")) {
DebugOptions debugOptions = new DebugOptions();
debugOptions.setEnableHotKeyLogging(true);
environment.setDebugOptions(debugOptions);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
index f901b01ed56..b94759c239a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration;
import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
@@ -132,7 +133,7 @@ public class DataflowWorkProgressUpdater extends
WorkProgressUpdater {
// The key set the in BatchModeExecutionContext is only set in the
GroupingShuffleReader
// which is the correct key. The key is also translated into a Java
object in the reader.
- if (options.isHotKeyLoggingEnabled()) {
+ if (options.isHotKeyLoggingEnabled() || hasExperiment(options,
"enable_hot_key_logging")) {
hotKeyLogger.logHotKeyDetection(
hotKeyDetection.getUserStepName(),
TimeUtil.fromCloudDuration(hotKeyDetection.getHotKeyAge()),
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index b0b6377dd8b..86f2cffe604 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
import java.util.Collection;
@@ -368,7 +370,8 @@ public final class StreamingWorkScheduler {
Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() /
1000);
String stepName =
getShuffleTaskStepName(computationState.getMapTask());
- if (options.isHotKeyLoggingEnabled() && keyCoder.isPresent()) {
+ if ((options.isHotKeyLoggingEnabled() || hasExperiment(options,
"enable_hot_key_logging"))
+ && keyCoder.isPresent()) {
hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
} else {
hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);