This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ef74412 Enable SideInput metrics for DF worker. These have been
launched on Dataflow
new ebafa0e Merge pull request #14018 from pabloem/enablesim
ef74412 is described below
commit ef7441257d655cd8bdc5ab502acfdaced9ad6e39
Author: Pablo Estrada <[email protected]>
AuthorDate: Thu Feb 18 13:31:08 2021 -0800
Enable SideInput metrics for DF worker. These have been launched on Dataflow
---
.../beam/runners/dataflow/worker/ExperimentContext.java | 3 +--
.../beam/runners/dataflow/worker/IsmSideInputReader.java | 11 ++---------
.../beam/runners/dataflow/worker/IsmSideInputReaderTest.java | 6 +-----
3 files changed, 4 insertions(+), 16 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
index 0d9e0ef..d215799 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
@@ -44,8 +44,7 @@ public class ExperimentContext {
* operations for some IO connectors.
*/
EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
- IntertransformIO("intertransform_io"), // Intertransform metrics for
Shuffle IO (insights)
- SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for
Side Input IO
+ IntertransformIO("intertransform_io"); // Intertransform metrics for
Shuffle IO (insights)
private final String name;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 26adc74..37fa6fc 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -56,7 +56,6 @@ import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
@@ -218,14 +217,8 @@ public class IsmSideInputReader implements SideInputReader
{
throw new Exception("unexpected kind of side input: " + sideInputKind);
}
- SideInputReadCounter sideInputReadCounter;
- ExperimentContext ec = ExperimentContext.parseFrom(options);
- if (ec.isEnabled(Experiment.SideInputIOMetrics)) {
- sideInputReadCounter =
- new DataflowSideInputReadCounter(executionContext, operationContext,
sideInputIndex);
- } else {
- sideInputReadCounter = new NoopSideInputReadCounter();
- }
+ SideInputReadCounter sideInputReadCounter =
+ new DataflowSideInputReadCounter(executionContext, operationContext,
sideInputIndex);
ImmutableList.Builder<IsmReader<?>> builder = ImmutableList.builder();
for (Source source : sideInputInfo.getSources()) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index 4083ee1..2367eac 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
@@ -114,7 +113,6 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
@@ -169,9 +167,7 @@ public class IsmSideInputReaderTest {
@Before
public void setUp() {
- pipelineOptions
- .as(DataflowPipelineDebugOptions.class)
-
.setExperiments(Lists.newArrayList(Experiment.SideInputIOMetrics.getName()));
+ pipelineOptions.as(DataflowPipelineDebugOptions.class);
setupCloser = Closer.create();
setupCloser.register(executionContext.getExecutionStateTracker().activate());
setupCloser.register(operationContext.enterProcess());