This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1f1660f [BEAM-6865] share non-Flink-specific pipeline helper utils new 85df199 Merge pull request #8085 from ibzib/shared-utils 1f1660f is described below commit 1f1660f0c97e65bb91e2c5aa7694b6d7b9215bbd Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Tue Mar 19 15:34:48 2019 -0700 [BEAM-6865] share non-Flink-specific pipeline helper utils --- .../flink/FlinkBatchPortablePipelineTranslator.java | 6 +++--- .../FlinkStreamingPortablePipelineTranslator.java | 9 ++++----- .../translation/PipelineTranslatorUtils.java} | 6 +++--- .../fnexecution/translation/package-info.java | 20 ++++++++++++++++++++ .../translation/PipelineTranslatorUtilsTest.java} | 11 ++++------- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 2f49e3d..29ac2b0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.flink; import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload; -import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap; -import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy; -import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; import com.google.auto.service.AutoService; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index 1c57824..df1c875 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -19,8 +19,9 @@ package org.apache.beam.runners.flink; import static java.lang.String.format; import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload; -import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy; -import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy; +import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -51,7 +52,6 @@ import org.apache.beam.runners.core.construction.graph.QueryablePipeline; import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext; import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -675,8 +675,7 @@ public class FlinkStreamingPortablePipelineTranslator outputs.isEmpty() ? null : new TupleTag(outputs.keySet().iterator().next()); // associate output tags with ids, output manager uses these Integer ids to serialize state - BiMap<String, Integer> outputIndexMap = - FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet()); + BiMap<String, Integer> outputIndexMap = createOutputMap(outputs.keySet()); Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap(); Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap(); Map<String, TupleTag<?>> collectionIdToTupleTag = Maps.newHashMap(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java similarity index 95% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java index b1943a9..c39b8b7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.translation.utils; +package org.apache.beam.runners.fnexecution.translation; import java.io.IOException; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -32,8 +32,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiM import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; /** Utilities for pipeline translation. */ -public final class FlinkPipelineTranslatorUtils { - private FlinkPipelineTranslatorUtils() {} +public final class PipelineTranslatorUtils { + private PipelineTranslatorUtils() {} /** Creates a mapping from PCollection id to output tag integer. */ public static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java new file mode 100644 index 0000000..7d000dd --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Shared utilities for a Beam runner to translate portable pipelines. */ +package org.apache.beam.runners.fnexecution.translation; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java similarity index 78% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java rename to runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java index 1751982..a37b12b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.translation; +package org.apache.beam.runners.fnexecution.translation; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -23,20 +23,17 @@ import static org.junit.Assert.assertThat; import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.junit.Test; -/** - * Tests for {@link org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils}. - */ -public class FlinkPipelineTranslatorUtilsTest { +/** Tests for {@link PipelineTranslatorUtils}. */ +public class PipelineTranslatorUtilsTest { @Test public void testOutputMapCreation() { List<String> outputs = Arrays.asList("output1", "output2", "output3"); - BiMap<String, Integer> outputMap = FlinkPipelineTranslatorUtils.createOutputMap(outputs); + BiMap<String, Integer> outputMap = PipelineTranslatorUtils.createOutputMap(outputs); Map<Object, Object> expected = ImmutableMap.builder().put("output1", 0).put("output2", 1).put("output3", 2).build(); assertThat(outputMap, is(expected));