This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/hot_key_cp in repository https://gitbox.apache.org/repos/asf/beam.git
commit b106c72cb683e1689eec70a5056d1402a868c33d Author: Danny McCormick <[email protected]> AuthorDate: Thu Aug 22 20:04:18 2024 +0100 Check experiment for hotkey logging (#32285) * Check experiment for hotkey logging * Spotless --- .../org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java | 2 +- .../beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java | 3 ++- .../worker/windmill/work/processing/StreamingWorkScheduler.java | 5 ++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 1fedcd8f3a2..c01096716c9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -423,7 +423,7 @@ public class DataflowPipelineTranslator { if (options.getDataflowKmsKey() != null) { environment.setServiceKmsKeyName(options.getDataflowKmsKey()); } - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { DebugOptions debugOptions = new DebugOptions(); debugOptions.setEnableHotKeyLogging(true); environment.setDebugOptions(debugOptions); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java index f901b01ed56..b94759c239a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.ApproximateSplitRequest; @@ -132,7 +133,7 @@ public class DataflowWorkProgressUpdater extends WorkProgressUpdater { // The key set the in BatchModeExecutionContext is only set in the GroupingShuffleReader // which is the correct key. The key is also translated into a Java object in the reader. - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { hotKeyLogger.logHotKeyDetection( hotKeyDetection.getUserStepName(), TimeUtil.fromCloudDuration(hotKeyDetection.getHotKeyAge()), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index b0b6377dd8b..86f2cffe604 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; + import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; import java.util.Collection; @@ -368,7 +370,8 @@ public final class StreamingWorkScheduler { Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 1000); String stepName = getShuffleTaskStepName(computationState.getMapTask()); - if (options.isHotKeyLoggingEnabled() && keyCoder.isPresent()) { + if ((options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) + && keyCoder.isPresent()) { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey); } else { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
