This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c391aba [BEAM-11033] Updates Dataflow metrics handling to support
portable job submission (#14158)
c391aba is described below
commit c391aba4f5edddcd20b25f89e1b987fc482ef129
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Tue Mar 9 00:27:17 2021 -0800
[BEAM-11033] Updates Dataflow metrics handling to support portable job
submission (#14158)
* Updates Dataflow user metrics handling to support portable job submission
* Addresses reviewer comments
* Addresses reviewer comments
---
.../beam/runners/dataflow/DataflowMetrics.java | 48 ++++++++++++++++++----
.../beam/runners/dataflow/DataflowPipelineJob.java | 34 ++++++++++++++-
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
3 files changed, 75 insertions(+), 10 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index 4469d2f..19449c4 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -243,22 +243,56 @@ class DataflowMetrics extends MetricResults {
}
/**
+ * Returns the user step name for a given internal step name.
+ *
+ * @param internalStepName internal step name used by Dataflow
+ * @return user step name used to identify the metric
+ */
+ private @Nullable String getUserStepName(String internalStepName) {
+ if (dataflowPipelineJob.getPipelineProto() != null
+ && dataflowPipelineJob
+ .getPipelineProto()
+ .getComponents()
+ .getTransformsMap()
+ .containsKey(internalStepName)) {
+ // Dataflow Runner v2 with portable job submission uses proto
transform map
+ // IDs for step names. Hence we lookup user step names based on the
proto.
+ return dataflowPipelineJob
+ .getPipelineProto()
+ .getComponents()
+ .getTransformsMap()
+ .get(internalStepName)
+ .getUniqueName();
+ } else {
+ if (dataflowPipelineJob.transformStepNames == null
+ ||
!dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName))
{
+ // If we can't translate internal step names to user step names, we
just skip them
+ // altogether.
+ return null;
+ }
+ return
dataflowPipelineJob.transformStepNames.inverse().get(internalStepName).getFullName();
+ }
+ }
+
+ /**
* Build an {@link MetricKey} that serves as a hash key for a metric
update.
*
* @return a {@link MetricKey} that can be hashed and used to identify a
metric.
*/
- private MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
- String fullStepName = metricUpdate.getName().getContext().get("step");
- if (dataflowPipelineJob.transformStepNames == null
- ||
!dataflowPipelineJob.transformStepNames.inverse().containsKey(fullStepName)) {
+ private @Nullable MetricKey getMetricHashKey(MetricUpdate metricUpdate) {
+ String internalStepName =
metricUpdate.getName().getContext().get("step");
+ String userStepName = getUserStepName(internalStepName);
+
+ if (userStepName == null
+ && (dataflowPipelineJob.transformStepNames == null
+ ||
!dataflowPipelineJob.transformStepNames.inverse().containsKey(internalStepName)))
{
// If we can't translate internal step names to user step names, we
just skip them
// altogether.
return null;
}
- fullStepName =
-
dataflowPipelineJob.transformStepNames.inverse().get(fullStepName).getFullName();
+
return MetricKey.create(
- fullStepName,
+ userStepName,
MetricName.named(
metricUpdate.getName().getContext().get("namespace"),
metricUpdate.getName().getName()));
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 3f14669..b2fdc19 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
@@ -121,23 +122,43 @@ public class DataflowPipelineJob implements
PipelineResult {
private @Nullable String latestStateString;
+ private final @Nullable Pipeline pipelineProto;
+
/**
* Constructs the job.
*
* @param jobId the job id
* @param dataflowOptions used to configure the client for the Dataflow
Service
* @param transformStepNames a mapping from AppliedPTransforms to Step Names
+ * @param pipelineProto Runner API pipeline proto.
*/
public DataflowPipelineJob(
DataflowClient dataflowClient,
String jobId,
DataflowPipelineOptions dataflowOptions,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames,
+ @Nullable Pipeline pipelineProto) {
this.dataflowClient = dataflowClient;
this.jobId = jobId;
this.dataflowOptions = dataflowOptions;
this.transformStepNames =
HashBiMap.create(firstNonNull(transformStepNames, ImmutableMap.of()));
this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
+ this.pipelineProto = pipelineProto;
+ }
+
+ /**
+ * Constructs the job.
+ *
+ * @param jobId the job id
+ * @param dataflowOptions used to configure the client for the Dataflow
Service
+ * @param transformStepNames a mapping from AppliedPTransforms to Step Names
+ */
+ public DataflowPipelineJob(
+ DataflowClient dataflowClient,
+ String jobId,
+ DataflowPipelineOptions dataflowOptions,
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ this(dataflowClient, jobId, dataflowOptions, transformStepNames, null);
}
/** Get the id of this job. */
@@ -154,6 +175,11 @@ public class DataflowPipelineJob implements PipelineResult
{
return dataflowOptions;
}
+ /** Get the Runner API pipeline proto if available. */
+ public @Nullable Pipeline getPipelineProto() {
+ return pipelineProto;
+ }
+
/** Get the region this job exists in. */
public String getRegion() {
return dataflowOptions.getRegion();
@@ -536,7 +562,11 @@ public class DataflowPipelineJob implements PipelineResult
{
terminalState = currentState;
replacedByJob =
new DataflowPipelineJob(
- dataflowClient, job.getReplacedByJobId(), dataflowOptions,
transformStepNames);
+ dataflowClient,
+ job.getReplacedByJobId(),
+ dataflowOptions,
+ transformStepNames,
+ pipelineProto);
}
return job;
} catch (IOException exn) {
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index cc0708f..0e3602a 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1172,7 +1172,8 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
DataflowClient.create(options),
jobResult.getId(),
options,
- jobSpecification.getStepNames());
+ jobSpecification.getStepNames(),
+ pipelineProto);
// If the service returned client request id, the SDK needs to compare it
// with the original id generated in the request, if they are not the same