Repository: beam Updated Branches: refs/heads/master 8160924e1 -> ea0f37db7
Inline and remove POutputValueBase Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66803115 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66803115 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66803115 Branch: refs/heads/master Commit: 66803115d3aaa225881137307486a955d1e0dc7b Parents: 2866549 Author: Kenneth Knowles <[email protected]> Authored: Thu Apr 20 18:47:57 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Apr 20 19:03:19 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/values/PDone.java | 30 ++++++-- .../beam/sdk/values/POutputValueBase.java | 74 -------------------- .../java/org/apache/beam/sdk/values/PValue.java | 2 + .../org/apache/beam/sdk/values/PValueBase.java | 29 ++++++-- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 37 +++++++++- 5 files changed, 83 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/66803115/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java index 1e5c4dc..5c9800d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java @@ -21,13 +21,20 @@ import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; /** * {@link PDone} is the output of a {@link PTransform} that has a trivial result, * such as a {@link WriteFiles}. */ -public class PDone extends POutputValueBase { +public class PDone implements POutput { + + private final Pipeline pipeline; + + private PDone(Pipeline pipeline) { + this.pipeline = pipeline; + } /** * Creates a {@link PDone} in the given {@link Pipeline}. @@ -37,12 +44,25 @@ public class PDone extends POutputValueBase { } @Override + public Pipeline getPipeline() { + return pipeline; + } + + /** + * A {@link PDone} contains no {@link PValue PValues}. + */ + @Override public Map<TupleTag<?>, PValue> expand() { - // A PDone contains no PValues. return Collections.emptyMap(); } - private PDone(Pipeline pipeline) { - super(pipeline); - } + /** Does nothing; there are no concrete outputs to record. */ + @Override + public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {} + + /** + * Does nothing; there is nothing to finish specifying. + */ + @Override + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } } http://git-wip-us.apache.org/repos/asf/beam/blob/66803115/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java deleted file mode 100644 index 5bd424d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutputValueBase.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.values; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * A {@link POutputValueBase} is the abstract base class of - * {@code PTransform} outputs. - * - * <p>A {@link PValueBase} that adds tracking of its producing - * {@link AppliedPTransform}. - * - * <p>For internal use. - */ -public abstract class POutputValueBase implements POutput { - - private final Pipeline pipeline; - - protected POutputValueBase(Pipeline pipeline) { - this.pipeline = pipeline; - } - - /** - * No-arg constructor for Java serialization only. - * The resulting {@link POutputValueBase} is unlikely to be - * valid. - */ - protected POutputValueBase() { - pipeline = null; - } - - @Override - public Pipeline getPipeline() { - return pipeline; - } - - /** - * Records that this {@link POutputValueBase} is an output with the given name of the given {@link - * AppliedPTransform}. - * - * <p>By default, does nothing. - * - * <p>To be invoked only by {@link POutput#recordAsOutput} implementations. Not to be invoked - * directly by user code. - */ - @Override - public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {} - - /** - * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is - * to do nothing. Override if your {@link PValue} requires - * finalization. - */ - @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } -} http://git-wip-us.apache.org/repos/asf/beam/blob/66803115/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java index 06546aa..d9f6920 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValue.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.transforms.PTransform; /** * The interface for values that can be input to and output from {@link PTransform PTransforms}. + * + * <p>It is recommended to extend {@link PValueBase} */ public interface PValue extends POutput, PInput { http://git-wip-us.apache.org/repos/asf/beam/blob/66803115/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index 91ee392..9f151ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -37,7 +37,10 @@ import org.apache.beam.sdk.util.NameUtils; * * <p>For internal use. */ -public abstract class PValueBase extends POutputValueBase implements PValue { +public abstract class PValueBase implements PValue { + + private final Pipeline pipeline; + /** * Returns the name of this {@link PValueBase}. * @@ -56,7 +59,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue { } /** - * Sets the name of this {@link PValueBase}. Returns {@code this}. + * Sets the name of this {@link PValueBase}. Returns {@code this}. * * @throws IllegalStateException if this {@link PValueBase} has * already been finalized and may no longer be set. @@ -73,7 +76,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue { ///////////////////////////////////////////////////////////////////////////// protected PValueBase(Pipeline pipeline) { - super(pipeline); + this.pipeline = pipeline; } /** @@ -82,11 +85,11 @@ public abstract class PValueBase extends POutputValueBase implements PValue { * valid. */ protected PValueBase() { - super(); + this.pipeline = null; } /** - * The name of this {@link PValueBase}, or null if not yet set. + * The name of this {@link PValueBase}, or {@code null} if not yet set. */ private String name; @@ -107,7 +110,7 @@ public abstract class PValueBase extends POutputValueBase implements PValue { } /** - * Records that this {@link POutputValueBase} is an output with the + * Records that this {@link PValueBase} is an output with the * given name of the given {@link AppliedPTransform} in the given * {@link Pipeline}. * @@ -116,7 +119,6 @@ public abstract class PValueBase extends POutputValueBase implements PValue { */ protected void recordAsOutput(AppliedPTransform<?, ?, ?> transform, String outName) { - super.recordAsOutput(transform); if (name == null) { name = transform.getFullName() + "." + outName; } @@ -157,4 +159,17 @@ public abstract class PValueBase extends POutputValueBase implements PValue { protected String getKindString() { return NameUtils.approximateSimpleName(getClass()); } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + /** + * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is + * to do nothing. Override if your {@link PValue} requires + * finalization. + */ + @Override + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } } http://git-wip-us.apache.org/repos/asf/beam/blob/66803115/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java index 3e0f51c..d137f05 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java @@ -20,14 +20,20 @@ package org.apache.beam.sdk.io.gcp.bigquery; import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.values.POutputValueBase; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; /** * The result of a {@link BigQueryIO.Write} transform. */ -final class WriteResult extends POutputValueBase { +final class WriteResult implements POutput { + + private final Pipeline pipeline; + /** * Creates a {@link WriteResult} in the given {@link Pipeline}. */ @@ -41,6 +47,31 @@ final class WriteResult extends POutputValueBase { } private WriteResult(Pipeline pipeline) { - super(pipeline); + this.pipeline = pipeline; } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + /** + * Records that this {@link WriteResult} is an output with the given name of the given {@link + * AppliedPTransform}. + * + * <p>By default, does nothing. + * + * <p>To be invoked only by {@link POutput#recordAsOutput} implementations. Not to be invoked + * directly by user code. + */ + @Override + public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {} + + /** + * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is + * to do nothing. Override if your {@link PValue} requires + * finalization. + */ + @Override + public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { } }
