Inline MapAggregatorValues to remove dependencies This class is trivial. Adding it to the public API of the SDK is not desirable, since it is just for runners. Adding it to runners-core would be OK but is really overkill for a glorified Map.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9da4bbcd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9da4bbcd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9da4bbcd Branch: refs/heads/gearpump-runner Commit: 9da4bbcdaf3c19ee5f78836b7cffaab947861a58 Parents: c867790 Author: Kenneth Knowles <[email protected]> Authored: Thu Jul 21 20:24:17 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 13:55:24 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 18 +++++-- .../runners/dataflow/DataflowPipelineJob.java | 17 ++++++- .../beam/sdk/util/MapAggregatorValues.java | 50 -------------------- 3 files changed, 30 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 743c565..a9c8ecb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.util.MapAggregatorValues; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; +import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -321,7 +321,7 @@ public class DirectRunner throws AggregatorRetrievalException { AggregatorContainer aggregators = evaluationContext.getAggregatorContainer(); Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator); - Map<String, T> stepValues = new HashMap<>(); + final Map<String, T> stepValues = new HashMap<>(); for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) { if (steps.contains(transform.getTransform())) { T aggregate = aggregators.getAggregate( @@ -331,7 +331,19 @@ public class DirectRunner } } } - return new MapAggregatorValues<>(stepValues); + return new AggregatorValues<T>() { + @Override + public Map<String, T> getValuesAtSteps() { + return stepValues; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("stepValues", stepValues) + .toString(); + } + }; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 3194f7c..a6baa4f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.MapAggregatorValues; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.util.BackOff; @@ -41,6 +40,7 @@ import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import org.joda.time.Duration; import org.slf4j.Logger; @@ -369,7 +369,20 @@ public class DataflowPipelineJob implements PipelineResult { public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator) throws AggregatorRetrievalException { try { - return new MapAggregatorValues<>(fromMetricUpdates(aggregator)); + final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator); + return new AggregatorValues<OutputT>() { + @Override + public Map<String, OutputT> getValuesAtSteps() { + return stepValues; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("stepValues", stepValues) + .toString(); + } + }; } catch (IOException e) { throw new AggregatorRetrievalException( "IOException when retrieving Aggregator values for Aggregator " + aggregator, e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9da4bbcd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java deleted file mode 100644 index 3d949ec..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.runners.AggregatorValues; -import org.apache.beam.sdk.transforms.Aggregator; - -import com.google.common.base.MoreObjects; - -import java.util.Map; - -/** - * An {@link AggregatorValues} implementation that is backed by an in-memory map. - * - * @param <T> the output type of the {@link Aggregator} - */ -public class MapAggregatorValues<T> extends AggregatorValues<T> { - private final Map<String, T> stepValues; - - public MapAggregatorValues(Map<String, T> stepValues) { - this.stepValues = stepValues; - } - - @Override - public Map<String, T> getValuesAtSteps() { - return stepValues; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("stepValues", stepValues) - .toString(); - } -}
