[
https://issues.apache.org/jira/browse/BEAM-3971?focusedWorklogId=118356&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118356
]
ASF GitHub Bot logged work on BEAM-3971:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 19:52
Start Date: 02/Jul/18 19:52
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #5833: [BEAM-3971,
BEAM-4284] Remove fromProto for Pipeline and PTransform translation.
URL: https://github.com/apache/beam/pull/5833
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 23e5e6afc94..221f8ba9a4f 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -28,11 +28,8 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
-import java.util.Optional;
-import javax.annotation.Nonnull;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -59,10 +56,6 @@
/** A {@link TransformPayloadTranslator} for {@link Combine.PerKey}. */
public static class CombinePayloadTranslator
implements
PTransformTranslation.TransformPayloadTranslator<Combine.PerKey<?, ?, ?>> {
- public static TransformPayloadTranslator create() {
- return new CombinePayloadTranslator();
- }
-
private CombinePayloadTranslator() {}
@Override
@@ -86,18 +79,6 @@ public FunctionSpec translate(
}
}
- @Override
- public PTransformTranslation.RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- checkArgument(
- protoTransform.getSpec() != null,
- "%s received transform with null spec",
- getClass().getSimpleName());
-
checkArgument(protoTransform.getSpec().getUrn().equals(COMBINE_TRANSFORM_URN));
- return new RawCombine<>(protoTransform, rehydratedComponents);
- }
-
/** Registers {@link CombinePayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements
TransformPayloadTranslatorRegistrar {
@@ -106,33 +87,10 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(Combine.PerKey.class, new
CombinePayloadTranslator());
}
-
- @Override
- public Map<String, ? extends TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.singletonMap(COMBINE_TRANSFORM_URN, new
CombinePayloadTranslator());
- }
}
}
- /**
- * These methods drive to-proto translation for both Java SDK transforms and
rehydrated
- * transforms.
- */
- interface CombineLike {
- RunnerApi.SdkFunctionSpec getCombineFn();
-
- Coder<?> getAccumulatorCoder();
- }
-
- /** Produces a {@link RunnerApi.CombinePayload} from a portable {@link
CombineLike}. */
- static RunnerApi.CombinePayload payloadForCombineLike(
- CombineLike combine, SdkComponents components) throws IOException {
- return RunnerApi.CombinePayload.newBuilder()
-
.setAccumulatorCoderId(components.registerCoder(combine.getAccumulatorCoder()))
- .setCombineFn(combine.getCombineFn())
- .build();
- }
-
+ /** Produces a {@link RunnerApi.CombinePayload} from a {@link Combine}. */
static <K, InputT, OutputT> CombinePayload payloadForCombine(
final AppliedPTransform<
PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
@@ -141,96 +99,28 @@ public FunctionSpec translate(
final SdkComponents components)
throws IOException {
- return payloadForCombineLike(
- new CombineLike() {
- @Override
- public SdkFunctionSpec getCombineFn() {
- return SdkFunctionSpec.newBuilder()
- .setEnvironmentId(
-
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
- .setPayload(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(
- combine.getTransform().getFn())))
- .build())
- .build();
- }
-
- @Override
- public Coder<?> getAccumulatorCoder() {
- GlobalCombineFn<?, ?, ?> combineFn =
combine.getTransform().getFn();
- try {
- return extractAccumulatorCoder(combineFn, (AppliedPTransform)
combine);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException(e);
- }
- }
- },
- components);
- }
-
- private static class RawCombine<K, InputT, AccumT, OutputT>
- extends PTransformTranslation.RawPTransform<
- PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
- implements CombineLike {
-
- private final RunnerApi.PTransform protoTransform;
- private final transient RehydratedComponents rehydratedComponents;
- private final FunctionSpec spec;
- private final CombinePayload payload;
- private final Coder<AccumT> accumulatorCoder;
-
- private RawCombine(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- this.protoTransform = protoTransform;
- this.rehydratedComponents = rehydratedComponents;
- this.spec = protoTransform.getSpec();
- this.payload = CombinePayload.parseFrom(spec.getPayload());
-
- // Eagerly extract the coder to throw a good exception here
- try {
- this.accumulatorCoder =
- (Coder<AccumT>)
rehydratedComponents.getCoder(payload.getAccumulatorCoderId());
- } catch (IOException exc) {
- throw new IllegalArgumentException(
- String.format(
- "Failure extracting accumulator coder with id '%s' for %s",
- payload.getAccumulatorCoderId(),
Combine.class.getSimpleName()),
- exc);
- }
- }
-
- @Override
- public String getUrn() {
- return COMBINE_TRANSFORM_URN;
- }
-
- @Nonnull
- @Override
- public FunctionSpec getSpec() {
- return spec;
- }
-
- @Override
- public RunnerApi.FunctionSpec migrate(SdkComponents sdkComponents) throws
IOException {
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(COMBINE_TRANSFORM_URN)
- .setPayload(payloadForCombineLike(this,
sdkComponents).toByteString())
+ GlobalCombineFn<?, ?, ?> combineFn = combine.getTransform().getFn();
+ try {
+ return RunnerApi.CombinePayload.newBuilder()
+ .setAccumulatorCoderId(
+ components.registerCoder(
+ extractAccumulatorCoder(combineFn, (AppliedPTransform)
combine)))
+ .setCombineFn(
+ SdkFunctionSpec.newBuilder()
+ .setEnvironmentId(
+
components.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT))
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(JAVA_SERIALIZED_COMBINE_FN_URN)
+ .setPayload(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(
+ combine.getTransform().getFn())))
+ .build())
+ .build())
.build();
- }
-
- @Override
- public SdkFunctionSpec getCombineFn() {
- return payload.getCombineFn();
- }
-
- @Override
- public Coder<?> getAccumulatorCoder() {
- return accumulatorCoder;
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(e);
}
}
@@ -249,7 +139,7 @@ static CombinePayload toProto(
.setCombineFn(toProto(combineFn, sdkComponents))
.build();
} catch (CannotProvideCoderException e) {
- throw new IllegalStateException(e);
+ throw new IllegalArgumentException(e);
}
}
@@ -283,62 +173,4 @@ public static SdkFunctionSpec toProto(
.build())
.build();
}
-
- public static Coder<?> getAccumulatorCoder(
- CombinePayload payload, RehydratedComponents components) throws
IOException {
- String id = payload.getAccumulatorCoderId();
- return components.getCoder(id);
- }
-
- public static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?>
transform)
- throws IOException {
- SdkComponents sdkComponents = SdkComponents.create();
- String id =
- getCombinePayload(transform, sdkComponents)
- .map(CombinePayload::getAccumulatorCoderId)
- .orElseThrow(() -> new IOException("Transform does not contain an
AccumulatorCoder"));
- Components components = sdkComponents.toComponents();
- return CoderTranslation.fromProto(
- components.getCodersOrThrow(id),
RehydratedComponents.forComponents(components));
- }
-
- public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload)
throws IOException {
- checkArgument(
-
payload.getCombineFn().getSpec().getUrn().equals(JAVA_SERIALIZED_COMBINE_FN_URN),
- "Payload URN was \"%s\", should have been \"%s\".",
- payload.getCombineFn().getSpec().getUrn(),
- JAVA_SERIALIZED_COMBINE_FN_URN);
- return (GlobalCombineFn<?, ?, ?>)
- SerializableUtils.deserializeFromByteArray(
- payload.getCombineFn().getSpec().getPayload().toByteArray(),
"CombineFn");
- }
-
- public static Optional<GlobalCombineFn<?, ?, ?>> getCombineFn(
- AppliedPTransform<?, ?, ?> transform) throws IOException {
- Optional<CombinePayload> payload = getCombinePayload(transform);
- if (payload.isPresent()) {
- return Optional.of(getCombineFn(payload.get()));
- } else {
- return Optional.empty();
- }
- }
-
- private static Optional<CombinePayload>
getCombinePayload(AppliedPTransform<?, ?, ?> transform)
- throws IOException {
- return getCombinePayload(transform, SdkComponents.create());
- }
-
- private static Optional<CombinePayload> getCombinePayload(
- AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws
IOException {
- RunnerApi.PTransform proto =
- PTransformTranslation.toProto(transform, Collections.emptyList(),
components);
-
- // Even if the proto has no spec, calling getSpec still returns a blank
spec, which we want to
- // avoid. It should be clear to the caller whether or not there was a spec
in the transform.
- if (proto.hasSpec()) {
- return
Optional.of(CombinePayload.parseFrom(proto.getSpec().getPayload()));
- } else {
- return Optional.empty();
- }
- }
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index eb10cf9784e..8d01b44ffd3 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -82,7 +82,7 @@
*/
@Deprecated
static class CreatePCollectionViewTranslator
- extends
TransformPayloadTranslator.WithDefaultRehydration<View.CreatePCollectionView<?,
?>> {
+ implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
@Override
public String getUrn(View.CreatePCollectionView<?, ?> transform) {
return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
@@ -116,10 +116,5 @@ public FunctionSpec translate(
return Collections.singletonMap(
View.CreatePCollectionView.class, new
CreatePCollectionViewTranslator());
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
index 972c453b4ea..c9798e6c0f6 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java
@@ -32,8 +32,7 @@
/**
* Utility methods for translating a {@link Assign} to and from {@link
RunnerApi} representations.
*/
-public class FlattenTranslator
- extends
TransformPayloadTranslator.WithDefaultRehydration<Flatten.PCollections<?>> {
+public class FlattenTranslator implements
TransformPayloadTranslator<Flatten.PCollections<?>> {
public static TransformPayloadTranslator create() {
return new FlattenTranslator();
@@ -60,10 +59,5 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(Flatten.PCollections.class, new
FlattenTranslator());
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
index 0803ad364a1..78e404171f0 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java
@@ -34,8 +34,7 @@
*/
public class GroupByKeyTranslation {
- static class GroupByKeyTranslator
- extends TransformPayloadTranslator.WithDefaultRehydration<GroupByKey<?,
?>> {
+ static class GroupByKeyTranslator implements
TransformPayloadTranslator<GroupByKey<?, ?>> {
@Override
public String getUrn(GroupByKey<?, ?> transform) {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -56,10 +55,5 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(GroupByKey.class, new
GroupByKeyTranslator());
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
index b78a5990e0c..23ef6eaeaa7 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java
@@ -33,8 +33,7 @@
* Utility methods for translating a {@link Impulse} to and from {@link
RunnerApi} representations.
*/
public class ImpulseTranslation {
- private static class ImpulseTranslator
- extends TransformPayloadTranslator.WithDefaultRehydration<Impulse> {
+ private static class ImpulseTranslator implements
TransformPayloadTranslator<Impulse> {
@Override
public String getUrn(Impulse transform) {
return PTransformTranslation.IMPULSE_TRANSFORM_URN;
@@ -55,10 +54,5 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(Impulse.class, new ImpulseTranslator());
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 53fe744491b..3da7871d6db 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -21,9 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
-import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
@@ -92,12 +90,6 @@
private static final Map<Class<? extends PTransform>,
TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
- private static final Map<String, TransformPayloadTranslator>
KNOWN_REHYDRATORS =
- loadTransformRehydrators();
-
- private static final TransformPayloadTranslator<?> DEFAULT_REHYDRATOR =
- new RawPTransformTranslator();
-
private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
loadTransformPayloadTranslators() {
HashMap<Class<? extends PTransform>, TransformPayloadTranslator>
translators = new HashMap<>();
@@ -122,28 +114,6 @@
return ImmutableMap.copyOf(translators);
}
- private static Map<String, TransformPayloadTranslator>
loadTransformRehydrators() {
- HashMap<String, TransformPayloadTranslator> rehydrators = new HashMap<>();
-
- for (TransformPayloadTranslatorRegistrar registrar :
- ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-
- Map<String, ? extends TransformPayloadTranslator> newRehydrators =
- registrar.getTransformRehydrators();
-
- Set<String> alreadyRegistered =
- Sets.intersection(rehydrators.keySet(), newRehydrators.keySet());
-
- if (!alreadyRegistered.isEmpty()) {
- throw new IllegalArgumentException(
- String.format("URNs already registered: %s", Joiner.on(",
").join(alreadyRegistered)));
- }
-
- rehydrators.putAll(newRehydrators);
- }
- return ImmutableMap.copyOf(rehydrators);
- }
-
private PTransformTranslation() {}
/**
@@ -214,26 +184,6 @@ private PTransformTranslation() {}
return transformBuilder.build();
}
- /**
- * Translates a {@link RunnerApi.PTransform} to a {@link RawPTransform}
specialized for the URN
- * and spec.
- */
- static RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
-
- @Nullable
- TransformPayloadTranslator<?> rehydrator =
- KNOWN_REHYDRATORS.get(
- protoTransform.getSpec() == null ? null :
protoTransform.getSpec().getUrn());
-
- if (rehydrator == null) {
- return DEFAULT_REHYDRATOR.rehydrate(protoTransform,
rehydratedComponents);
- } else {
- return rehydrator.rehydrate(protoTransform, rehydratedComponents);
- }
- }
-
/**
* Translates a composite {@link AppliedPTransform} into a runner API proto
with no component
* transforms.
@@ -303,24 +253,6 @@ public static String
urnForTransformOrNull(RunnerApi.PTransform transform) {
FunctionSpec translate(AppliedPTransform<?, ?, T> application,
SdkComponents components)
throws IOException;
- RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException;
-
- /**
- * A {@link TransformPayloadTranslator} for transforms that contain no
references to components,
- * so they do not need a specialized rehydration.
- */
- abstract class WithDefaultRehydration<T extends PTransform<?, ?>>
- implements TransformPayloadTranslator<T> {
- @Override
- public final RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- return UnknownRawPTransform.forSpec(protoTransform.getSpec());
- }
- }
-
/**
* A {@link TransformPayloadTranslator} for transforms that contain no
references to components,
* so they do not need a specialized rehydration.
@@ -345,16 +277,6 @@ public final FunctionSpec translate(
"%s should never be translated",
transform.getTransform().getClass().getCanonicalName()));
}
-
- @Override
- public final RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- throw new UnsupportedOperationException(
- String.format(
- "%s.rehydrate should never be called; there is no serialized
form",
- getClass().getCanonicalName()));
- }
}
}
@@ -405,64 +327,4 @@ public OutputT expand(InputT input) {
getClass().getSimpleName()));
}
}
-
- @AutoValue
- abstract static class UnknownRawPTransform extends RawPTransform<PInput,
POutput> {
-
- @Override
- public String getUrn() {
- return getSpec() == null ? null : getSpec().getUrn();
- }
-
- @Nullable
- @Override
- public abstract RunnerApi.FunctionSpec getSpec();
-
- public static UnknownRawPTransform forSpec(RunnerApi.FunctionSpec spec) {
- return new AutoValue_PTransformTranslation_UnknownRawPTransform(spec);
- }
-
- @Override
- public POutput expand(PInput input) {
- throw new IllegalStateException(
- String.format(
- "%s should never be asked to expand;"
- + " it is the result of deserializing an already-constructed
Pipeline",
- getClass().getSimpleName()));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("urn", getUrn())
- .add("payload", getSpec())
- .toString();
- }
-
- public RunnerApi.FunctionSpec getSpecForComponents(SdkComponents
components) {
- return getSpec();
- }
- }
-
- /** A translator that uses the explicit URN and payload from a {@link
RawPTransform}. */
- public static class RawPTransformTranslator
- implements TransformPayloadTranslator<RawPTransform<?, ?>> {
- @Override
- public String getUrn(RawPTransform<?, ?> transform) {
- return transform.getUrn();
- }
-
- @Override
- public FunctionSpec translate(
- AppliedPTransform<?, ?, RawPTransform<?, ?>> transform, SdkComponents
components)
- throws IOException {
- return transform.getTransform().migrate(components);
- }
-
- @Override
- public RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents) {
- return UnknownRawPTransform.forSpec(protoTransform.getSpec());
- }
- }
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index c25822d5601..159005d735b 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -26,7 +26,6 @@
import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
@@ -75,7 +74,6 @@
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
@@ -115,13 +113,6 @@ public FunctionSpec translate(
.build();
}
- @Override
- public PTransformTranslation.RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- return new RawParDo<>(protoTransform, rehydratedComponents);
- }
-
/** Registers {@link ParDoPayloadTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements
TransformPayloadTranslatorRegistrar {
@@ -130,11 +121,6 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(ParDo.MultiOutput.class, new
ParDoPayloadTranslator());
}
-
- @Override
- public Map<String, ? extends TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.singletonMap(PAR_DO_TRANSFORM_URN, new
ParDoPayloadTranslator());
- }
}
}
@@ -570,105 +556,6 @@ public static SdkFunctionSpec translateWindowMappingFn(
.build();
}
- static class RawParDo<InputT, OutputT>
- extends PTransformTranslation.RawPTransform<PCollection<InputT>,
PCollection<OutputT>>
- implements ParDoLike {
-
- private final RunnerApi.PTransform protoTransform;
- private final transient RehydratedComponents rehydratedComponents;
-
- // Parsed from protoTransform and cached
- private final FunctionSpec spec;
- private final ParDoPayload payload;
-
- public RawParDo(RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- this.rehydratedComponents = rehydratedComponents;
- this.protoTransform = protoTransform;
- this.spec = protoTransform.getSpec();
- this.payload = ParDoPayload.parseFrom(spec.getPayload());
- }
-
- @Override
- public FunctionSpec getSpec() {
- return spec;
- }
-
- @Override
- public FunctionSpec migrate(SdkComponents components) throws IOException {
- return FunctionSpec.newBuilder()
- .setUrn(PAR_DO_TRANSFORM_URN)
- .setPayload(payloadForParDoLike(this, components).toByteString())
- .build();
- }
-
- @Override
- public Map<TupleTag<?>, PValue> getAdditionalInputs() {
- Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
- for (Map.Entry<String, SideInput> sideInputEntry :
payload.getSideInputsMap().entrySet()) {
- try {
- additionalInputs.put(
- new TupleTag<>(sideInputEntry.getKey()),
- rehydratedComponents.getPCollection(
- protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
- } catch (IOException exc) {
- throw new IllegalStateException(
- String.format(
- "Could not find input with name %s for %s transform",
- sideInputEntry.getKey(), ParDo.class.getSimpleName()));
- }
- }
- return additionalInputs;
- }
-
- @Override
- public SdkFunctionSpec translateDoFn(SdkComponents newComponents) {
- SdkFunctionSpec sdkFnSpec = payload.getDoFn();
- return sdkFnSpec
- .toBuilder()
- .setEnvironmentId(
- newComponents.registerEnvironment(
-
rehydratedComponents.getEnvironment(sdkFnSpec.getEnvironmentId())))
- .build();
- }
-
- @Override
- public List<RunnerApi.Parameter> translateParameters() {
- return MoreObjects.firstNonNull(
- payload.getParametersList(),
Collections.<RunnerApi.Parameter>emptyList());
- }
-
- @Override
- public Map<String, SideInput> translateSideInputs(SdkComponents
components) {
- // TODO: re-register the PCollections and UDF environments
- return MoreObjects.firstNonNull(
- payload.getSideInputsMap(), Collections.<String,
SideInput>emptyMap());
- }
-
- @Override
- public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents
components) {
- // TODO: re-register the coders
- return MoreObjects.firstNonNull(
- payload.getStateSpecsMap(), Collections.<String,
RunnerApi.StateSpec>emptyMap());
- }
-
- @Override
- public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents
newComponents) {
- return MoreObjects.firstNonNull(
- payload.getTimerSpecsMap(), Collections.<String,
RunnerApi.TimerSpec>emptyMap());
- }
-
- @Override
- public boolean isSplittable() {
- return payload.getSplittable();
- }
-
- @Override
- public String translateRestrictionCoderId(SdkComponents newComponents) {
- return payload.getRestrictionCoderId();
- }
- }
-
/** These methods drive to-proto translation from Java and from rehydrated
ParDos. */
public interface ParDoLike {
SdkFunctionSpec translateDoFn(SdkComponents newComponents);
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index fa6ae015e54..a95420ccccd 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -18,34 +18,18 @@
package org.apache.beam.runners.core.construction;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
/** Utilities for going to/from Runner API pipelines. */
public class PipelineTranslation {
@@ -101,104 +85,4 @@ public void visitPrimitiveTransform(Node node) {
PipelineValidator.validate(res);
return res;
}
-
- private static DisplayData evaluateDisplayData(HasDisplayData component) {
- return DisplayData.from(component);
- }
-
- public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
throws IOException {
- PipelineValidator.validate(pipelineProto);
- TransformHierarchy transforms = new TransformHierarchy();
- Pipeline pipeline = Pipeline.forTransformHierarchy(transforms,
PipelineOptionsFactory.create());
-
- // Keeping the PCollections straight is a semantic necessity, but being
careful not to explode
- // the number of coders and windowing strategies is also nice, and helps
testing.
- RehydratedComponents rehydratedComponents =
-
RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
-
- for (String rootId : pipelineProto.getRootTransformIdsList()) {
- addRehydratedTransform(
- transforms,
- pipelineProto.getComponents().getTransformsOrThrow(rootId),
- pipeline,
- pipelineProto.getComponents().getTransformsMap(),
- rehydratedComponents);
- }
-
- return pipeline;
- }
-
- private static void addRehydratedTransform(
- TransformHierarchy transforms,
- RunnerApi.PTransform transformProto,
- Pipeline pipeline,
- Map<String, RunnerApi.PTransform> transformProtos,
- RehydratedComponents rehydratedComponents)
- throws IOException {
-
- Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>();
- for (Map.Entry<String, String> inputEntry :
transformProto.getInputsMap().entrySet()) {
- rehydratedInputs.put(
- new TupleTag<>(inputEntry.getKey()),
- rehydratedComponents.getPCollection(inputEntry.getValue()));
- }
-
- Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>();
- for (Map.Entry<String, String> outputEntry :
transformProto.getOutputsMap().entrySet()) {
- rehydratedOutputs.put(
- new TupleTag<>(outputEntry.getKey()),
- rehydratedComponents.getPCollection(outputEntry.getValue()));
- }
-
- RawPTransform<?, ?> transform =
- PTransformTranslation.rehydrate(transformProto, rehydratedComponents);
-
- if (isPrimitive(transformProto)) {
- transforms.addFinalizedPrimitiveNode(
- transformProto.getUniqueName(), rehydratedInputs, transform,
rehydratedOutputs);
- } else {
- transforms.pushFinalizedNode(
- transformProto.getUniqueName(), rehydratedInputs, transform,
rehydratedOutputs);
-
- for (String childTransformId : transformProto.getSubtransformsList()) {
- addRehydratedTransform(
- transforms,
- transformProtos.get(childTransformId),
- pipeline,
- transformProtos,
- rehydratedComponents);
- }
-
- transforms.popNode();
- }
- }
-
- private static Map<TupleTag<?>, PValue> sideInputMapToAdditionalInputs(
- RunnerApi.PTransform transformProto,
- RehydratedComponents rehydratedComponents,
- Map<TupleTag<?>, PValue> rehydratedInputs,
- Map<String, RunnerApi.SideInput> sideInputsMap)
- throws IOException {
- List<PCollectionView<?>> views = new ArrayList<>();
- for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
sideInputsMap.entrySet()) {
- String localName = sideInputEntry.getKey();
- RunnerApi.SideInput sideInput = sideInputEntry.getValue();
- PCollection<?> pCollection =
- (PCollection<?>) checkNotNull(rehydratedInputs.get(new
TupleTag<>(localName)));
- views.add(
- PCollectionViewTranslation.viewFromProto(
- sideInput, localName, pCollection, transformProto,
rehydratedComponents));
- }
- return PCollectionViews.toAdditionalInputs(views);
- }
-
- // A primitive transform is one with outputs that are not in its input and
also
- // not produced by a subtransform.
- private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
- return transformProto.getSubtransformsCount() == 0
- && !transformProto
- .getInputsMap()
- .values()
- .containsAll(transformProto.getOutputsMap().values());
- }
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index b5b469cf750..e12bbb67da8 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -153,8 +153,7 @@ private static SdkFunctionSpec toProto(UnboundedSource<?,
?> source, SdkComponen
/** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */
public static class UnboundedReadPayloadTranslator
- extends
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
- Read.Unbounded<?>> {
+ implements
PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
public static TransformPayloadTranslator create() {
return new UnboundedReadPayloadTranslator();
}
@@ -179,8 +178,7 @@ public FunctionSpec translate(
/** A {@link TransformPayloadTranslator} for {@link Read.Bounded}. */
public static class BoundedReadPayloadTranslator
- extends
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
- Read.Bounded<?>> {
+ implements
PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
public static TransformPayloadTranslator create() {
return new BoundedReadPayloadTranslator();
}
@@ -214,10 +212,5 @@ public FunctionSpec translate(
.put(Read.Bounded.class, new BoundedReadPayloadTranslator())
.build();
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 60ad288a368..b581eecf414 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -312,17 +311,11 @@ public PCollectionTuple expand(PCollection<KV<String,
KV<InputT, RestrictionT>>>
.put(ProcessKeyedElements.class, new
ProcessKeyedElementsTranslator())
.build();
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
/** A translator for {@link ProcessKeyedElements}. */
public static class ProcessKeyedElementsTranslator
- extends
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
- ProcessKeyedElements<?, ?, ?>> {
+ implements
PTransformTranslation.TransformPayloadTranslator<ProcessKeyedElements<?, ?, ?>>
{
public static TransformPayloadTranslator create() {
return new ProcessKeyedElementsTranslator();
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 1b1884477c9..76330bb8d9a 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -22,14 +22,12 @@
import static
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
import com.google.auto.service.AutoService;
-import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nonnull;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
@@ -49,73 +47,6 @@
*/
public class TestStreamTranslation {
- private interface TestStreamLike {
- Coder<?> getValueCoder();
-
- List<RunnerApi.TestStreamPayload.Event> getEvents();
- }
-
- @VisibleForTesting
- static class RawTestStream<T> extends
PTransformTranslation.RawPTransform<PBegin, PCollection<T>>
- implements TestStreamLike {
-
- private final transient RehydratedComponents rehydratedComponents;
- private final RunnerApi.TestStreamPayload payload;
- private final Coder<T> valueCoder;
- private final RunnerApi.FunctionSpec spec;
-
- public RawTestStream(
- RunnerApi.TestStreamPayload payload, RehydratedComponents
rehydratedComponents) {
- this.payload = payload;
- this.spec =
- RunnerApi.FunctionSpec.newBuilder()
- .setUrn(TEST_STREAM_TRANSFORM_URN)
- .setPayload(payload.toByteString())
- .build();
- this.rehydratedComponents = rehydratedComponents;
-
- // Eagerly extract the coder to throw a good exception here
- try {
- this.valueCoder = (Coder<T>)
rehydratedComponents.getCoder(payload.getCoderId());
- } catch (IOException exc) {
- throw new IllegalArgumentException(
- String.format(
- "Failure extracting coder with id '%s' for %s",
- payload.getCoderId(), TestStream.class.getSimpleName()),
- exc);
- }
- }
-
- @Override
- public String getUrn() {
- return TEST_STREAM_TRANSFORM_URN;
- }
-
- @Nonnull
- @Override
- public RunnerApi.FunctionSpec getSpec() {
- return spec;
- }
-
- @Override
- public RunnerApi.FunctionSpec migrate(SdkComponents components) throws
IOException {
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(TEST_STREAM_TRANSFORM_URN)
- .setPayload(payloadForTestStreamLike(this,
components).toByteString())
- .build();
- }
-
- @Override
- public Coder<T> getValueCoder() {
- return valueCoder;
- }
-
- @Override
- public List<RunnerApi.TestStreamPayload.Event> getEvents() {
- return payload.getEventsList();
- }
- }
-
private static TestStream<?> testStreamFromProtoPayload(
RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents
components)
throws IOException {
@@ -241,20 +172,6 @@ public String getUrn(TestStream<?> transform) {
return translateTyped(transform.getTransform(), components);
}
- @Override
- public PTransformTranslation.RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- checkArgument(
- protoTransform.getSpec() != null,
- "%s received transform with null spec",
- getClass().getSimpleName());
-
checkArgument(protoTransform.getSpec().getUrn().equals(TEST_STREAM_TRANSFORM_URN));
- return new RawTestStream<>(
-
RunnerApi.TestStreamPayload.parseFrom(protoTransform.getSpec().getPayload()),
- rehydratedComponents);
- }
-
private <T> RunnerApi.FunctionSpec translateTyped(
final TestStream<T> testStream, SdkComponents components) throws
IOException {
return RunnerApi.FunctionSpec.newBuilder()
@@ -271,46 +188,24 @@ public String getUrn(TestStream<?> transform) {
getTransformPayloadTranslators() {
return Collections.singletonMap(TestStream.class, new
TestStreamTranslator());
}
+ }
+ }
- @Override
- public Map<String, ? extends TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.singletonMap(TEST_STREAM_TRANSFORM_URN, new
TestStreamTranslator());
+ /** Produces a {@link RunnerApi.TestStreamPayload} from a {@link
TestStream}. */
+ static <T> RunnerApi.TestStreamPayload payloadForTestStream(
+ final TestStream<T> transform, SdkComponents components) throws
IOException {
+ List<RunnerApi.TestStreamPayload.Event> protoEvents = new ArrayList<>();
+ try {
+ for (TestStream.Event<T> event : transform.getEvents()) {
+ protoEvents.add(eventToProto(event, transform.getValueCoder()));
}
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- }
- /** Produces a {@link RunnerApi.TestStreamPayload} from a portable {@link
RawTestStream}. */
- static RunnerApi.TestStreamPayload payloadForTestStreamLike(
- TestStreamLike transform, SdkComponents components) throws IOException {
return RunnerApi.TestStreamPayload.newBuilder()
.setCoderId(components.registerCoder(transform.getValueCoder()))
- .addAllEvents(transform.getEvents())
+ .addAllEvents(protoEvents)
.build();
}
-
- @VisibleForTesting
- static <T> RunnerApi.TestStreamPayload payloadForTestStream(
- final TestStream<T> testStream, SdkComponents components) throws
IOException {
- return payloadForTestStreamLike(
- new TestStreamLike() {
- @Override
- public Coder<T> getValueCoder() {
- return testStream.getValueCoder();
- }
-
- @Override
- public List<RunnerApi.TestStreamPayload.Event> getEvents() {
- try {
- List<RunnerApi.TestStreamPayload.Event> protoEvents = new
ArrayList<>();
- for (TestStream.Event<T> event : testStream.getEvents()) {
- protoEvents.add(eventToProto(event,
testStream.getValueCoder()));
- }
- return protoEvents;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- },
- components);
- }
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index 58417a89421..3b3ffa18b26 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -26,6 +26,4 @@
public interface TransformPayloadTranslatorRegistrar {
Map<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
getTransformPayloadTranslators();
-
- Map<String, ? extends TransformPayloadTranslator> getTransformRehydrators();
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 89091b8d72c..0086f8fa89b 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -42,8 +42,7 @@
*/
public class WindowIntoTranslation {
- static class WindowAssignTranslator
- extends
TransformPayloadTranslator.WithDefaultRehydration<Window.Assign<?>> {
+ static class WindowAssignTranslator implements
TransformPayloadTranslator<Window.Assign<?>> {
@Override
public String getUrn(Assign<?> transform) {
@@ -107,8 +106,7 @@ public static WindowIntoPayload
getWindowIntoPayload(AppliedPTransform<?, ?, ?>
/** A {@link TransformPayloadTranslator} for {@link Window}. */
public static class WindowIntoPayloadTranslator
- extends
PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration<
- Window.Assign<?>> {
+ implements
PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
public static TransformPayloadTranslator create() {
return new WindowIntoPayloadTranslator();
}
@@ -139,10 +137,5 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(Window.Assign.class, new
WindowIntoPayloadTranslator());
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 154893062ba..bd6454fa558 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -286,13 +286,6 @@ public FunctionSpec translate(
.setPayload(payloadForWriteFiles(transform.getTransform(),
components).toByteString())
.build();
}
-
- @Override
- public PTransformTranslation.RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- return new RawWriteFiles(protoTransform, rehydratedComponents);
- }
}
/** Registers {@link WriteFilesTranslator}. */
@@ -303,11 +296,6 @@ public FunctionSpec translate(
getTransformPayloadTranslators() {
return Collections.singletonMap(WriteFiles.CONCRETE_CLASS, new
WriteFilesTranslator());
}
-
- @Override
- public Map<String, ? extends TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.singletonMap(WRITE_FILES_TRANSFORM_URN, new
WriteFilesTranslator());
- }
}
/** These methods drive to-proto translation from Java and from rehydrated
WriteFiles. */
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
index e73b3ae90ac..363f2935445 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
@@ -22,6 +22,7 @@
import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
@@ -41,6 +42,7 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.Rule;
@@ -76,7 +78,7 @@
public Combine.CombineFn<Integer, ?, ?> combineFn;
@Test
- public void testToFromProto() throws Exception {
+ public void testToProto() throws Exception {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
input.apply(Combine.globally(combineFn));
final AtomicReference<AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>>>
combine =
@@ -92,7 +94,7 @@ public void leaveCompositeTransform(Node node) {
}
});
checkState(combine.get() != null);
- assertEquals(combineFn,
CombineTranslation.getCombineFn(combine.get()).orElse(null));
+ assertEquals(combineFn, combine.get().getTransform().getFn());
SdkComponents sdkComponents = SdkComponents.create();
CombinePayload combineProto = CombineTranslation.toProto(combine.get(),
sdkComponents);
@@ -100,9 +102,11 @@ public void leaveCompositeTransform(Node node) {
assertEquals(
combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(),
input.getCoder()),
- CombineTranslation.getAccumulatorCoder(
- combineProto,
RehydratedComponents.forComponents(componentsProto)));
- assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
+ getAccumulatorCoder(combineProto,
RehydratedComponents.forComponents(componentsProto)));
+ assertEquals(
+ combineFn,
+ SerializableUtils.deserializeFromByteArray(
+
combineProto.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"));
}
}
@@ -113,7 +117,7 @@ public void leaveCompositeTransform(Node node) {
@Rule public ExpectedException exception = ExpectedException.none();
@Test
- public void testToFromProtoWithoutSideInputs() throws Exception {
+ public void testToProtoWithoutSideInputs() throws Exception {
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
CombineFnWithContext<Integer, int[], Integer> combineFn = new
TestCombineFnWithContext();
input.apply(Combine.globally(combineFn).withoutDefaults());
@@ -130,7 +134,7 @@ public void leaveCompositeTransform(Node node) {
}
});
checkState(combine.get() != null);
- assertEquals(combineFn,
CombineTranslation.getCombineFn(combine.get()).orElse(null));
+ assertEquals(combineFn, combine.get().getTransform().getFn());
SdkComponents sdkComponents = SdkComponents.create();
CombinePayload combineProto = CombineTranslation.toProto(combine.get(),
sdkComponents);
@@ -138,9 +142,11 @@ public void leaveCompositeTransform(Node node) {
assertEquals(
combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(),
input.getCoder()),
- CombineTranslation.getAccumulatorCoder(
- combineProto,
RehydratedComponents.forComponents(componentsProto)));
- assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
+ getAccumulatorCoder(combineProto,
RehydratedComponents.forComponents(componentsProto)));
+ assertEquals(
+ combineFn,
+ SerializableUtils.deserializeFromByteArray(
+
combineProto.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"));
}
@Test
@@ -179,6 +185,12 @@ public void leaveCompositeTransform(Node node) {
}
}
+ private static Coder<?> getAccumulatorCoder(
+ CombinePayload payload, RehydratedComponents components) throws
IOException {
+ String id = payload.getAccumulatorCoderId();
+ return components.getCoder(id);
+ }
+
private static class TestCombineFn extends Combine.CombineFn<Integer, Void,
Void> {
@Override
public Void createAccumulator() {
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
index b6c785d0f16..405c60c68a9 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -24,15 +24,20 @@
import com.google.common.base.Equivalence;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
@@ -109,14 +114,6 @@ public void testProtoDirectly() {
pipeline.traverseTopologically(new
PipelineProtoVerificationVisitor(pipelineProto));
}
- @Test
- public void testProtoAgainstRehydrated() throws Exception {
- RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
- Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto);
-
- rehydrated.traverseTopologically(new
PipelineProtoVerificationVisitor(pipelineProto));
- }
-
private static class PipelineProtoVerificationVisitor extends
PipelineVisitor.Defaults {
private final RunnerApi.Pipeline pipelineProto;
@@ -159,8 +156,7 @@ public void leaveCompositeTransform(Node node) {
// Combine translation introduces a coder that is not assigned to
any PCollection
// in the default expansion, and must be explicitly added here.
try {
- addCoders(
-
CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
+
addCoders(getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -193,4 +189,30 @@ private void addCoders(Coder<?> coder) {
}
}
}
+
+ private static Coder<?> getAccumulatorCoder(AppliedPTransform<?, ?, ?>
transform)
+ throws IOException {
+ SdkComponents sdkComponents = SdkComponents.create();
+ String id =
+ getCombinePayload(transform, sdkComponents)
+ .map(CombinePayload::getAccumulatorCoderId)
+ .orElseThrow(() -> new IOException("Transform does not contain an
AccumulatorCoder"));
+ Components components = sdkComponents.toComponents();
+ return CoderTranslation.fromProto(
+ components.getCodersOrThrow(id),
RehydratedComponents.forComponents(components));
+ }
+
+ private static Optional<CombinePayload> getCombinePayload(
+ AppliedPTransform<?, ?, ?> transform, SdkComponents components) throws
IOException {
+ RunnerApi.PTransform proto =
+ PTransformTranslation.toProto(transform, Collections.emptyList(),
components);
+
+ // Even if the proto has no spec, calling getSpec still returns a blank
spec, which we want to
+ // avoid. It should be clear to the caller whether or not there was a spec
in the transform.
+ if (proto.hasSpec()) {
+ return
Optional.of(CombinePayload.parseFrom(proto.getSpec().getPayload()));
+ } else {
+ return Optional.empty();
+ }
+ }
}
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 076a3fe8cb9..4d2374da87e 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -96,7 +96,7 @@ task needsRunnerTests(type: Test) {
group = "Verification"
description = "Runs tests that require a runner to validate that
piplines/transforms work correctly"
- def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner",
"--runnerDeterminedSharding=false", "--protoTranslation"])
+ def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner",
"--runnerDeterminedSharding=false"])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.needsRunner
@@ -118,7 +118,7 @@ task validatesRunner(type: Test) {
group = "Verification"
description "Validates Direct runner"
- def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner",
"--runnerDeterminedSharding=false", "--protoTranslation"])
+ def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner",
"--runnerDeterminedSharding=false"])
systemProperty "beamTestPipelineOptions", pipelineOptions
classpath = configurations.validatesRunner
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 2c5804e49d7..1599a6ec0e7 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -78,11 +76,4 @@ public Integer create(PipelineOptions options) {
return Math.max(Runtime.getRuntime().availableProcessors(),
MIN_PARALLELISM);
}
}
-
- @Experimental(Kind.CORE_RUNNERS_ONLY)
- @Default.Boolean(false)
- @Description("Control whether toProto/fromProto translations are applied to
original Pipeline")
- boolean isProtoTranslation();
-
- void setProtoTranslation(boolean b);
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dd28ab84c03..8a633ae9f31 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -36,7 +35,6 @@
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -152,17 +150,7 @@ void setClockSupplier(Supplier<Clock> supplier) {
}
@Override
- public DirectPipelineResult run(Pipeline originalPipeline) {
- Pipeline pipeline;
- if (options.isProtoTranslation()) {
- try {
- pipeline =
PipelineTranslation.fromProto(PipelineTranslation.toProto(originalPipeline));
- } catch (IOException exception) {
- throw new RuntimeException("Error preparing pipeline for direct
execution.", exception);
- }
- } else {
- pipeline = originalPipeline;
- }
+ public DirectPipelineResult run(Pipeline pipeline) {
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
try {
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index 746d2384457..9e356f48233 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -23,16 +23,13 @@
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Iterables;
-import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.CombineTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -72,14 +69,8 @@ public static PTransformMatcher matcher() {
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
- try {
- Optional<GlobalCombineFn<?, ?, ?>> fn =
CombineTranslation.getCombineFn(application);
- if (fn.isPresent()) {
- return isApplicable(application.getInputs(), fn.get());
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ GlobalCombineFn<?, ?, ?> fn = ((Combine.PerKey)
application.getTransform()).getFn();
+ return isApplicable(application.getInputs(), fn);
}
return false;
}
@@ -143,37 +134,23 @@ private Factory() {}
PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
OutputT>>>>
transform) {
- try {
- GlobalCombineFn<?, ?, ?> globalFn =
- CombineTranslation.getCombineFn(transform)
- .orElseThrow(
- () ->
- new IOException(
- String.format(
- "%s.matcher() should only match %s instances
using %s, but %s was missing",
- MultiStepCombine.class.getSimpleName(),
- PerKey.class.getSimpleName(),
- CombineFn.class.getSimpleName(),
- CombineFn.class.getSimpleName())));
- checkState(
- globalFn instanceof CombineFn,
- "%s.matcher() should only match %s instances using %s, got %s",
- MultiStepCombine.class.getSimpleName(),
- PerKey.class.getSimpleName(),
- CombineFn.class.getSimpleName(),
- globalFn.getClass().getName());
- @SuppressWarnings("unchecked")
- CombineFn<InputT, AccumT, OutputT> fn = (CombineFn<InputT, AccumT,
OutputT>) globalFn;
- @SuppressWarnings("unchecked")
- PCollection<KV<K, InputT>> input =
- (PCollection<KV<K, InputT>>)
Iterables.getOnlyElement(transform.getInputs().values());
- @SuppressWarnings("unchecked")
- PCollection<KV<K, OutputT>> output =
- (PCollection<KV<K, OutputT>>)
Iterables.getOnlyElement(transform.getOutputs().values());
- return PTransformReplacement.of(input, new MultiStepCombine<>(fn,
output.getCoder()));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ GlobalCombineFn<?, ?, ?> globalFn = ((Combine.PerKey)
transform.getTransform()).getFn();
+ checkState(
+ globalFn instanceof CombineFn,
+ "%s.matcher() should only match %s instances using %s, got %s",
+ MultiStepCombine.class.getSimpleName(),
+ PerKey.class.getSimpleName(),
+ CombineFn.class.getSimpleName(),
+ globalFn.getClass().getName());
+ @SuppressWarnings("unchecked")
+ CombineFn<InputT, AccumT, OutputT> fn = (CombineFn<InputT, AccumT,
OutputT>) globalFn;
+ @SuppressWarnings("unchecked")
+ PCollection<KV<K, InputT>> input =
+ (PCollection<KV<K, InputT>>)
Iterables.getOnlyElement(transform.getInputs().values());
+ @SuppressWarnings("unchecked")
+ PCollection<KV<K, OutputT>> output =
+ (PCollection<KV<K, OutputT>>)
Iterables.getOnlyElement(transform.getOutputs().values());
+ return PTransformReplacement.of(input, new MultiStepCombine<>(fn,
output.getCoder()));
}
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index e61fe7277ff..6d27ab98c09 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -36,7 +36,6 @@
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
@@ -130,11 +129,6 @@ public static TransformEvaluatorRegistry
javaSdkNativeRegistry(
TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN))
.build();
}
-
- @Override
- public Map<String, TransformPayloadTranslator> getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
/**
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 0c94d46c1b7..11f36cbe839 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.CombineTranslation;
import
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
@@ -374,15 +373,8 @@ public void translateNode(
DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
- CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
- try {
- combineFn =
- (CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT>)
- CombineTranslation.getCombineFn(context.getCurrentTransform())
- .orElseThrow(() -> new IOException("CombineFn not found in
node."));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn =
+ ((Combine.PerKey) transform).getFn();
KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>)
context.getInput(transform).getCoder();
@@ -407,6 +399,14 @@ public void translateNode(
Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<>(inputCoder.getKeyCoder()));
+ // construct a map from side input to WindowingStrategy so that
+ // the DoFn runner can map main-input windows to side input windows
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies =
new HashMap<>();
+ for (PCollectionView<?> sideInput :
+ (List<PCollectionView<?>>) ((Combine.PerKey)
transform).getSideInputs()) {
+ sideInputStrategies.put(sideInput,
sideInput.getWindowingStrategyInternal());
+ }
+
WindowingStrategy<Object, BoundedWindow> boundedStrategy =
(WindowingStrategy<Object, BoundedWindow>) windowingStrategy;
@@ -415,11 +415,11 @@ public void translateNode(
FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction
=
new FlinkPartialReduceFunction<>(
- combineFn, boundedStrategy, new HashMap<>(),
context.getPipelineOptions());
+ combineFn, boundedStrategy, sideInputStrategies,
context.getPipelineOptions());
FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
new FlinkReduceFunction<>(
- combineFn, boundedStrategy, new HashMap<>(),
context.getPipelineOptions());
+ combineFn, boundedStrategy, sideInputStrategies,
context.getPipelineOptions());
// Partially GroupReduce the values into the intermediate format
AccumT (combine)
GroupCombineOperator<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K,
AccumT>>>
@@ -430,6 +430,8 @@ public void translateNode(
partialReduceFunction,
"GroupCombine: " + fullName);
+ transformSideInputs(((Combine.PerKey) transform).getSideInputs(),
groupCombine, context);
+
TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
context.getTypeInfo(context.getOutput(transform));
@@ -442,6 +444,8 @@ public void translateNode(
new GroupReduceOperator<>(
intermediateGrouping, reduceTypeInfo, reduceFunction,
fullName);
+ transformSideInputs(((Combine.PerKey) transform).getSideInputs(),
outputDataSet, context);
+
context.setOutputDataSet(context.getOutput(transform), outputDataSet);
} else {
@@ -452,7 +456,7 @@ public void translateNode(
RichGroupReduceFunction<WindowedValue<KV<K, InputT>>,
WindowedValue<KV<K, OutputT>>>
reduceFunction =
new FlinkMergingNonShuffleReduceFunction<>(
- combineFn, boundedStrategy, new HashMap<>(),
context.getPipelineOptions());
+ combineFn, boundedStrategy, sideInputStrategies,
context.getPipelineOptions());
TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
context.getTypeInfo(context.getOutput(transform));
@@ -465,6 +469,8 @@ public void translateNode(
outputDataSet =
new GroupReduceOperator<>(grouping, reduceTypeInfo,
reduceFunction, fullName);
+ transformSideInputs(((Combine.PerKey) transform).getSideInputs(),
outputDataSet, context);
+
context.setOutputDataSet(context.getOutput(transform), outputDataSet);
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index af5b431a4e3..84876604cd0 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -19,8 +19,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import java.io.IOException;
-import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -75,13 +73,6 @@ public void translate(FlinkRunner flinkRunner, Pipeline
pipeline) {
this.flinkBatchEnv = null;
this.flinkStreamEnv = null;
- // Serialize and rehydrate pipeline to make sure we only depend serialized
transforms.
- try {
- pipeline =
PipelineTranslation.fromProto(PipelineTranslation.toProto(pipeline));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
PipelineTranslationOptimizer optimizer =
new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 669843fdf55..e22a02a6e3a 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -36,7 +36,6 @@
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.construction.CombineTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
@@ -61,6 +60,7 @@
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -104,6 +104,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
@@ -837,7 +838,7 @@ public void translateNode(
inputKvCoder.getKeyCoder(),
keySelector);
- // our operator excepts WindowedValue<KeyedWorkItem> while our input
stream
+ // our operator expects WindowedValue<KeyedWorkItem> while our input
stream
// is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
@SuppressWarnings("unchecked")
SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>>
outDataStream =
@@ -853,6 +854,23 @@ public void translateNode(
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>
{
+ @Override
+ boolean canTranslate(
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
+ FlinkStreamingTranslationContext context) {
+ // if we have a merging window strategy and side inputs we cannot
+ // translate as a proper combine. We have to group and then run the
combine
+ // over the final grouped values.
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+ return windowingStrategy.getWindowFn().isNonMerging()
+ || ((Combine.PerKey) transform).getSideInputs().isEmpty();
+ }
+
@Override
public void translateNode(
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
@@ -892,15 +910,7 @@ public void translateNode(
KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
keyedWorkItemStream = workItemStream.keyBy(keySelector);
- GlobalCombineFn<? super InputT, ?, OutputT> combineFn;
- try {
- combineFn =
- (GlobalCombineFn<? super InputT, ?, OutputT>)
- CombineTranslation.getCombineFn(context.getCurrentTransform())
- .orElseThrow(() -> new IOException("CombineFn not found in
node."));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ GlobalCombineFn<? super InputT, ?, OutputT> combineFn =
((Combine.PerKey) transform).getFn();
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn =
SystemReduceFn.combining(
inputKvCoder.getKeyCoder(),
@@ -912,30 +922,81 @@ public void translateNode(
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
- WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
- new WindowDoFnOperator<>(
- reduceFn,
- fullName,
- (Coder) windowedWorkItemCoder,
- mainTag,
- Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
- windowingStrategy,
- new HashMap<>(), /* side-input mapping */
- Collections.emptyList(), /* side inputs */
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder(),
- keySelector);
+ List<PCollectionView<?>> sideInputs = ((Combine.PerKey)
transform).getSideInputs();
- // our operator excepts WindowedValue<KeyedWorkItem> while our input
stream
- // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
- @SuppressWarnings("unchecked")
- SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
- keyedWorkItemStream
- .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
- .uid(fullName);
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ if (sideInputs.isEmpty()) {
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ fullName,
+ (Coder) windowedWorkItemCoder,
+ mainTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ windowingStrategy,
+ new HashMap<>(), /* side-input mapping */
+ Collections.emptyList(), /* side inputs */
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder(),
+ keySelector);
+
+ // our operator expects WindowedValue<KeyedWorkItem> while our input
stream
+ // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
+ @SuppressWarnings("unchecked")
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>>
outDataStream =
+ keyedWorkItemStream
+ .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
+ .uid(fullName);
+ context.setOutputDataStream(context.getOutput(transform),
outDataStream);
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ fullName,
+ (Coder) windowedWorkItemCoder,
+ mainTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ windowingStrategy,
+ transformSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder(),
+ keySelector);
+
+ // we have to manually contruct the two-input transform because we're
not
+ // allowed to have only one input keyed, normally.
+
+ TwoInputTransformation<
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
RawUnionValue,
+ WindowedValue<KV<K, OutputT>>>
+ rawFlinkTransform =
+ new TwoInputTransformation<>(
+ keyedWorkItemStream.getTransformation(),
+ transformSideInputs.f1.broadcast().getTransformation(),
+ transform.getName(),
+ (TwoInputStreamOperator) doFnOperator,
+ outputTypeInfo,
+ keyedWorkItemStream.getParallelism());
+
+ rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(),
null);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>>
outDataStream =
+ new SingleOutputStreamOperator(
+ keyedWorkItemStream.getExecutionEnvironment(),
+ rawFlinkTransform) {}; // we have to cheat around the ctor
being protected
+
+
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+ context.setOutputDataStream(context.getOutput(transform),
outDataStream);
+ }
}
}
@@ -1143,11 +1204,6 @@ public String
getUrn(SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?
new SplittableParDoGbkIntoKeyedWorkItemsPayloadTranslator())
.build();
}
-
- @Override
- public Map<String, PTransformTranslation.TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
/**
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 5351510c13c..717585c4d7a 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -37,7 +37,6 @@
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -149,16 +148,6 @@ public String getUrn(ParDoSingle<?, ?> transform) {
.build();
}
- @Override
- public PTransformTranslation.RawPTransform<?, ?> rehydrate(
- RunnerApi.PTransform protoTransform, RehydratedComponents
rehydratedComponents)
- throws IOException {
- throw new UnsupportedOperationException(
- String.format(
- "%s.rehydrate should never be called; the serialized form is
that of a ParDo",
- getClass().getCanonicalName()));
- }
-
private static RunnerApi.ParDoPayload payloadForParDoSingle(
final ParDoSingle<?, ?> parDo, SdkComponents components) throws
IOException {
final DoFn<?, ?> doFn = parDo.getFn();
@@ -239,11 +228,5 @@ public String translateRestrictionCoderId(SdkComponents
newComponents) {
getTransformPayloadTranslators() {
return Collections.singletonMap(ParDoSingle.class, new
PayloadTranslator());
}
-
- @Override
- public Map<String, ? extends
PTransformTranslation.TransformPayloadTranslator>
- getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 3cc7de0240a..4edd2359257 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -18,11 +18,9 @@
package org.apache.beam.runners.samza.translation;
-import java.io.IOException;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.construction.CombineTranslation;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
@@ -128,16 +126,8 @@ public void translate(
return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
SystemReduceFn.buffering(kvInputCoder.getValueCoder());
} else if (transform instanceof Combine.PerKey) {
- final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT>
combineFn;
- try {
- combineFn =
- (CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT>)
- CombineTranslation.getCombineFn(appliedPTransform)
- .orElseThrow(() -> new IOException("CombineFn not found in
node."));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
+ final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT>
combineFn =
+ ((Combine.PerKey) transform).getFn();
return SystemReduceFn.combining(
kvInputCoder.getKeyCoder(),
AppliedCombineFn.withInputCoder(combineFn,
pipeline.getCoderRegistry(), kvInputCoder));
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 542fede0139..d444d05a0a8 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -22,7 +22,6 @@
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -135,10 +134,5 @@ private static String getUrnForTransform(PTransform<?, ?>
transform) {
return ImmutableMap.of(
SamzaPublishView.class, new
SamzaPublishView.SamzaPublishViewPayloadTranslator());
}
-
- @Override
- public Map<String, PTransformTranslation.TransformPayloadTranslator>
getTransformRehydrators() {
- return Collections.emptyMap();
- }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118356)
Time Spent: 7.5h (was: 7h 20m)
> Pipeline translation utilities should not use SDK construction classes
> ----------------------------------------------------------------------
>
> Key: BEAM-3971
> URL: https://issues.apache.org/jira/browse/BEAM-3971
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 7.5h
> Remaining Estimate: 0h
>
> In general, portable runners will require access to pipeline information not
> available in rehydrated pipelines while constructing physical plans.
> Translation utilities should operate directly on protos or on thin,
> information-preserving wrappers.
> The pipeline fusion utilities already operate on protos directly and can be
> used as an example of how this could be done.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)