Repository: beam Updated Branches: refs/heads/master 6a95e5eec -> e3a0e26e2
Remove POutput#recordAsOutput Add PValue#setDefaultName, for composite POutputs to use when they are being finalized. Expand POutput#finishSpecifyingOutput to take the name of the transform that is being finished. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab8f92fc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab8f92fc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab8f92fc Branch: refs/heads/master Commit: ab8f92fcc272e7eca70720fb91df584769376ecc Parents: 6a95e5e Author: Thomas Groh <[email protected]> Authored: Tue May 2 17:53:57 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu May 4 10:16:19 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/runners/TransformHierarchy.java | 8 ++-- .../org/apache/beam/sdk/values/PCollection.java | 4 +- .../apache/beam/sdk/values/PCollectionList.java | 14 +++---- .../beam/sdk/values/PCollectionTuple.java | 17 ++++---- .../java/org/apache/beam/sdk/values/PDone.java | 12 ++---- .../org/apache/beam/sdk/values/POutput.java | 25 ++--------- .../org/apache/beam/sdk/values/PValueBase.java | 44 ++++++-------------- .../sdk/runners/TransformHierarchyTest.java | 3 +- .../apache/beam/sdk/transforms/ParDoTest.java | 4 +- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 21 +--------- 10 files changed, 47 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 9236194..703aeb6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -171,14 +171,14 @@ public class TransformHierarchy { for (PValue value : output.expand().values()) { if (!producers.containsKey(value)) { producers.put(value, current); + value.finishSpecifyingOutput( + current.getFullName(), unexpandedInputs.get(current), current.transform); } - value.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform); producerInput.put(value, unexpandedInputs.get(current)); } - output.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform); + output.finishSpecifyingOutput( + current.getFullName(), unexpandedInputs.get(current), current.transform); current.setOutput(output); - // TODO: Replace with a "generateDefaultNames" method. - output.recordAsOutput(current.toAppliedPTransform()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index 20e5d68..1622322 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -83,9 +83,9 @@ public class PCollection<T> extends PValueBase implements PValue { @Override public void finishSpecifyingOutput( - PInput input, PTransform<?, ?> transform) { + String transformName, PInput input, PTransform<?, ?> transform) { this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); - super.finishSpecifyingOutput(input, transform); + super.finishSpecifyingOutput(transformName, input, transform); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java index 7b45deb..48c3649 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Partition; @@ -230,22 +229,21 @@ public class PCollectionList<T> implements PInput, POutput { } @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) { + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform<?, ?> transform) { + // All component PCollections will have already been finished. int i = 0; for (TaggedPValue tpv : pcollections) { @SuppressWarnings("unchecked") PCollection<T> pc = (PCollection<T>) tpv.getValue(); - pc.recordAsOutput(transform, "out" + i); + if (pc.getName().equals(PValueBase.defaultName(transformName))) { + pc.setName(String.format("%s.%s%s", transformName, "out", i)); + } i++; } } @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { - // All component PCollections will have already been finished. - } - - @Override public boolean equals(Object other) { if (!(other instanceof PCollectionList)) { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java index ce67e94..5027df6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java @@ -23,7 +23,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; @@ -236,23 +235,23 @@ public class PCollectionTuple implements PInput, POutput { } @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) { + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform<?, ?> transform) { + // All component PCollections will already have been finished. Update their names if + // appropriate. int i = 0; for (Map.Entry<TupleTag<?>, PCollection<?>> entry - : pcollectionMap.entrySet()) { + : pcollectionMap.entrySet()) { TupleTag<?> tag = entry.getKey(); PCollection<?> pc = entry.getValue(); - pc.recordAsOutput(transform, tag.getOutName(i)); + if (pc.getName().equals(PValueBase.defaultName(transformName))) { + pc.setName(String.format("%s.%s", transformName, tag.getOutName(i))); + } i++; } } @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { - // All component PCollections will already have been finished - } - - @Override public boolean equals(Object other) { if (!(other instanceof PCollectionTuple)) { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java index 5c9800d..92473b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.WriteFiles; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; /** @@ -56,13 +55,8 @@ public class PDone implements POutput { return Collections.emptyMap(); } - /** Does nothing; there are no concrete outputs to record. */ + /** Does nothing; there is nothing to finish specifying. */ @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {} - - /** - * Does nothing; there is nothing to finish specifying. - */ - @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform<?, ?> transform) {} } http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java index bb01beb..c6d15e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.values; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; /** @@ -48,31 +47,15 @@ public interface POutput { Map<TupleTag<?>, PValue> expand(); /** - * Records that this {@code POutput} is an output of the given - * {@code PTransform}. - * - * <p>For a compound {@code POutput}, it is advised to call - * this method on each component {@code POutput}. - * - * <p>This is not intended to be invoked by user code, but - * is automatically invoked as part of applying the - * producing {@link PTransform}. - */ - void recordAsOutput(AppliedPTransform<?, ?, ?> transform); - - /** * As part of applying the producing {@link PTransform}, finalizes this output to make it ready * for being used as an input and for running. * * <p>This includes ensuring that all {@link PCollection PCollections} have {@link * org.apache.beam.sdk.coders.Coder Coders} specified or defaulted. * - * <p>Automatically invoked whenever this {@link POutput} is output, after {@link - * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link - * PValue} returned by {@link #expand()}. - * - * @deprecated see BEAM-1199 + * <p>Automatically invoked whenever this {@link POutput} is output, after + * {@link PValue#finishSpecifyingOutput(String, PInput, PTransform)} has been called on each + * component {@link PValue} returned by {@link #expand()}. */ - @Deprecated - void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform); + void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform); } http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index 7ab5808..4de0589 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.values; +import static com.google.common.base.Preconditions.checkState; + import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.NameUtils; @@ -65,10 +66,7 @@ public abstract class PValueBase implements PValue { * already been finalized and may no longer be set. */ public PValueBase setName(String name) { - if (finishedSpecifying) { - throw new IllegalStateException( - "cannot change the name of " + this + " once it's been used"); - } + checkState(!finishedSpecifying, "cannot change the name of %s once it's been used", this); this.name = name; return this; } @@ -104,26 +102,6 @@ public abstract class PValueBase implements PValue { */ private boolean finishedSpecifying = false; - @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) { - recordAsOutput(transform, "out"); - } - - /** - * Records that this {@link PValueBase} is an output with the - * given name of the given {@link AppliedPTransform} in the given - * {@link Pipeline}. - * - * <p>To be invoked only by {@link POutput#recordAsOutput} - * implementations. Not to be invoked directly by user code. - */ - protected void recordAsOutput(AppliedPTransform<?, ?, ?> transform, - String outName) { - if (name == null) { - name = transform.getFullName() + "." + outName; - } - } - /** * Returns whether this {@link PValueBase} has been finalized, and * its core properties, e.g., name, can no longer be changed. @@ -165,11 +143,15 @@ public abstract class PValueBase implements PValue { return pipeline; } - /** - * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is - * to do nothing. Override if your {@link PValue} requires - * finalization. - */ @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform<?, ?> transform) { + if (name == null) { + setName(defaultName(transformName)); + } + } + + static String defaultName(String transformName) { + return String.format("%s.%s", transformName, "out"); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 1b884e2..e495758 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -145,7 +145,8 @@ public class TransformHierarchyTest implements Serializable { final PCollectionList<Long> appended = pcList.and( PCollection.<Long>createPrimitiveOutputInternal( - pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)); + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) + .setName("prim")); hierarchy.pushNode( "AddPc", pcList, http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 26904aa..d4475c9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -1244,7 +1244,9 @@ public class ParDoTest implements Serializable { outputTuple.get(additionalOutputTag).apply(View.<TestDummy>asSingleton()); assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder()); - outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes + outputTuple + .get(additionalOutputTag) + .finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption } http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java index d137f05..bc18e8e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -55,23 +54,7 @@ final class WriteResult implements POutput { return pipeline; } - /** - * Records that this {@link WriteResult} is an output with the given name of the given {@link - * AppliedPTransform}. - * - * <p>By default, does nothing. - * - * <p>To be invoked only by {@link POutput#recordAsOutput} implementations. Not to be invoked - * directly by user code. - */ - @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {} - - /** - * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is - * to do nothing. Override if your {@link PValue} requires - * finalization. - */ @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform<?, ?> transform) {} }
