Add Display Data 'path' metadata Display Data supports the notion of "sub components", components within a transform class which can contribute their own display data. We add a namespace to display data items based on the originating component, which keeps the display data items unique within the step.
There are instances where a component is included multiple times within a step. We handle the case of the same instance being shared by simply ignoring it the second time. However, we don't handle the case of a separate instance being added of the same class. Currently the separate instances will add display data with the same namespace and key, causing a failure. This can come up for example when infrastructure at different levels wrap and re-wrap a component. We saw this with a bounded source being adapted multiple times, Bounded -> Unbounded -> Bounded -> Unbounded. The BoundedToUnboundedSourceAdapter was included multiple times with separate instances and caused a failure while populating display data. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad03d07a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad03d07a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad03d07a Branch: refs/heads/master Commit: ad03d07ae783f054a31e8b2e14100afff8cdf747 Parents: ff6301b Author: Scott Wegner <[email protected]> Authored: Wed Oct 12 14:49:41 2016 -0700 Committer: bchambers <[email protected]> Committed: Thu Oct 20 11:10:13 2016 -0700 ---------------------------------------------------------------------- .../core/UnboundedReadFromBoundedSource.java | 2 +- runners/direct-java/pom.xml | 5 - .../runners/direct/ForwardingPTransform.java | 2 +- .../beam/runners/direct/DirectRunnerTest.java | 32 -- .../direct/ForwardingPTransformTest.java | 7 +- .../beam/runners/dataflow/DataflowRunner.java | 5 +- .../DataflowUnboundedReadFromBoundedSource.java | 4 +- .../sdk/io/BoundedReadFromUnboundedSource.java | 5 +- .../apache/beam/sdk/io/CompressedSource.java | 2 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 5 +- .../main/java/org/apache/beam/sdk/io/Read.java | 4 +- .../main/java/org/apache/beam/sdk/io/Write.java | 6 +- .../sdk/options/ProxyInvocationHandler.java | 149 +++--- .../org/apache/beam/sdk/transforms/Combine.java | 60 +-- .../apache/beam/sdk/transforms/CombineFns.java | 33 +- .../beam/sdk/transforms/CombineWithContext.java | 3 +- .../beam/sdk/transforms/DoFnAdapters.java | 2 +- .../beam/sdk/transforms/FlatMapElements.java | 6 +- .../apache/beam/sdk/transforms/MapElements.java | 8 +- .../org/apache/beam/sdk/transforms/ParDo.java | 60 ++- .../apache/beam/sdk/transforms/Partition.java | 2 +- .../sdk/transforms/display/DisplayData.java | 518 +++++++++++++------ .../beam/sdk/transforms/windowing/Window.java | 2 +- .../io/BoundedReadFromUnboundedSourceTest.java | 4 +- .../beam/sdk/io/CompressedSourceTest.java | 4 +- .../java/org/apache/beam/sdk/io/ReadTest.java | 10 +- .../java/org/apache/beam/sdk/io/WriteTest.java | 6 +- .../sdk/options/ProxyInvocationHandlerTest.java | 40 ++ .../beam/sdk/transforms/CombineFnsTest.java | 7 +- .../apache/beam/sdk/transforms/CombineTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 8 +- .../transforms/display/DisplayDataMatchers.java | 141 +++-- .../display/DisplayDataMatchersTest.java | 67 ++- .../sdk/transforms/display/DisplayDataTest.java | 367 +++++++++---- .../sdk/transforms/windowing/WindowTest.java | 4 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 - .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- 38 files changed, 988 insertions(+), 605 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 91a1715..2afdcf2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -108,7 +108,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle // We explicitly do not register base-class data, instead we use the delegate inner source. builder .add(DisplayData.item("source", source.getClass())) - .include(source); + .include("source", source); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 354c8c7..6cb1838 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -286,11 +286,6 @@ </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java index 3160b58..77311c2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ForwardingPTransform.java @@ -57,6 +57,6 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend @Override public void populateDisplayData(DisplayData.Builder builder) { - delegate().populateDisplayData(builder); + builder.delegate(delegate()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 37af90c..4027d25 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.io.InputStream; @@ -242,37 +241,6 @@ public class DirectRunnerTest implements Serializable { p.run(); } - @Test - public void pipelineOptionsDisplayDataExceptionShouldFail() { - Object brokenValueType = new Object() { - @JsonValue - public int getValue () { - return 42; - } - - @Override - public String toString() { - throw new RuntimeException("oh noes!!"); - } - }; - - Pipeline p = getPipeline(); - p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType); - - p.apply(Create.of(1, 2, 3)); - - thrown.expectMessage(PipelineOptions.class.getName()); - thrown.expectCause(ThrowableMessageMatcher.hasMessage(is("oh noes!!"))); - p.run(); - } - - /** {@link PipelineOptions} to inject bad object implementations. */ - public interface ObjectPipelineOptions extends PipelineOptions { - Object getValue(); - void setValue(Object value); - } - - /** * Tests that a {@link OldDoFn} that mutates an output with a good equals() fails in the * {@link DirectRunner}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java index 6abaf92..c75adaa 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ForwardingPTransformTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -102,10 +103,10 @@ public class ForwardingPTransformTest { @Test public void populateDisplayDataDelegates() { - DisplayData.Builder builder = mock(DisplayData.Builder.class); - doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder); + doThrow(RuntimeException.class) + .when(delegate).populateDisplayData(any(DisplayData.Builder.class)); thrown.expect(RuntimeException.class); - forwarding.populateDisplayData(builder); + DisplayData.from(forwarding); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5f83788..7bf270d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2249,8 +2249,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("source", source.getClass())); - builder.include(source); + builder + .add(DisplayData.item("source", source.getClass())) + .include("source", source); } public UnboundedSource<T, ?> getSource() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java index e4257d1..96a35bc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -120,7 +120,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin // We explicitly do not register base-class data, instead we use the delegate inner source. builder .add(DisplayData.item("source", source.getClass())) - .include(source); + .include("source", source); } /** @@ -195,7 +195,7 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("source", boundedSource.getClass())); - builder.include(boundedSource); + builder.include("source", boundedSource); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 630a8a3..40c52a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -119,7 +119,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle .withLabel("Maximum Read Records"), Long.MAX_VALUE) .addIfNotNull(DisplayData.item("maxReadTime", maxReadTime) .withLabel("Maximum Read Time")) - .include(source); + .include("source", source); } private static class UnboundedToBoundedSourceAdapter<T> @@ -204,8 +204,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("source", source.getClass())); - builder.include(source); + builder.delegate(source); } private class Reader extends BoundedReader<ValueWithRecordId<T>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 680dc2c..bf871b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -390,7 +390,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { public void populateDisplayData(DisplayData.Builder builder) { // We explicitly do not register base-class data, instead we use the delegate inner source. builder - .include(sourceDelegate) + .include("source", sourceDelegate) .add(DisplayData.item("source", sourceDelegate.getClass()) .withLabel("Read Source")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 6091156..72a6399 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -792,8 +792,7 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - Bound.this.populateDisplayData(builder); + builder.delegate(Bound.this); } } } @@ -1043,7 +1042,7 @@ public class PubsubIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - Bound.this.populateDisplayData(builder); + builder.delegate(Bound.this); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 29c4e47..f04fbaf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -127,7 +127,7 @@ public class Read { builder .add(DisplayData.item("source", source.getClass()) .withLabel("Read Source")) - .include(source); + .include("source", source); } } @@ -194,7 +194,7 @@ public class Read { builder .add(DisplayData.item("source", source.getClass()) .withLabel("Read Source")) - .include(source); + .include("source", source); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index e8b19d9..7559fca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -118,7 +118,7 @@ public class Write { super.populateDisplayData(builder); builder .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) - .include(sink) + .include("sink", sink) .addIfNotDefault( DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"), 0); @@ -209,7 +209,7 @@ public class Write { @Override public void populateDisplayData(DisplayData.Builder builder) { - Write.Bound.this.populateDisplayData(builder); + builder.delegate(Write.Bound.this); } } @@ -261,7 +261,7 @@ public class Write { @Override public void populateDisplayData(DisplayData.Builder builder) { - Write.Bound.this.populateDisplayData(builder); + builder.delegate(Write.Bound.this); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index a77dcc6..3e74916 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -86,7 +86,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers; * {@link PipelineOptions#as(Class)}. */ @ThreadSafe -class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { +class ProxyInvocationHandler implements InvocationHandler { private static final ObjectMapper MAPPER = new ObjectMapper(); /** * No two instances of this class are considered equivalent hence we generate a random hash code. @@ -138,8 +138,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { && args[0] instanceof DisplayData.Builder) { @SuppressWarnings("unchecked") DisplayData.Builder builder = (DisplayData.Builder) args[0]; - // Explicitly set display data namespace so thrown exceptions will have sensible type. - builder.include(this, PipelineOptions.class); + builder.delegate(new PipelineOptionsDisplayData()); return Void.TYPE; } String methodName = method.getName(); @@ -243,88 +242,116 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { } /** - * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set - * pipeline options will be added as display data. + * Nested class to handle display data in order to set the display data namespace to something + * sensible. */ - public void populateDisplayData(DisplayData.Builder builder) { - Set<PipelineOptionSpec> optionSpecs = PipelineOptionsReflector.getOptionSpecs(knownInterfaces); - Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs); - - for (Map.Entry<String, BoundValue> option : options.entrySet()) { - BoundValue boundValue = option.getValue(); - if (boundValue.isDefault()) { - continue; - } + class PipelineOptionsDisplayData implements HasDisplayData { + /** + * Populate display data. See {@link HasDisplayData#populateDisplayData}. All explicitly set + * pipeline options will be added as display data. + */ + public void populateDisplayData(DisplayData.Builder builder) { + Set<PipelineOptionSpec> optionSpecs = + PipelineOptionsReflector.getOptionSpecs(knownInterfaces); - Object value = boundValue.getValue() == null ? "" : boundValue.getValue(); - DisplayData.Type type = DisplayData.inferType(value); - HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey())); + Multimap<String, PipelineOptionSpec> optionsMap = buildOptionNameToSpecMap(optionSpecs); - for (PipelineOptionSpec optionSpec : specs) { - if (!optionSpec.shouldSerialize()) { - // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also - // excluded from display data. These options are generally not useful for display. + for (Map.Entry<String, BoundValue> option : options.entrySet()) { + BoundValue boundValue = option.getValue(); + if (boundValue.isDefault()) { continue; } - Class<?> pipelineInterface = optionSpec.getDefiningInterface(); - if (type != null) { - builder.add(DisplayData.item(option.getKey(), type, value) - .withNamespace(pipelineInterface)); - } else { - builder.add(DisplayData.item(option.getKey(), displayDataString(value)) - .withNamespace(pipelineInterface)); + DisplayDataValue resolved = DisplayDataValue.resolve(boundValue.getValue()); + HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(option.getKey())); + + for (PipelineOptionSpec optionSpec : specs) { + if (!optionSpec.shouldSerialize()) { + // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also + // excluded from display data. These options are generally not useful for display. + continue; + } + + builder.add(DisplayData.item(option.getKey(), resolved.getType(), resolved.getValue()) + .withNamespace(optionSpec.getDefiningInterface())); } } - } - for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) { - if (options.containsKey(jsonOption.getKey())) { - // Option overwritten since deserialization; don't re-write - continue; - } + for (Map.Entry<String, JsonNode> jsonOption : jsonOptions.entrySet()) { + if (options.containsKey(jsonOption.getKey())) { + // Option overwritten since deserialization; don't re-write + continue; + } + + HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey())); + if (specs.isEmpty()) { + // No PipelineOptions interface for this key not currently loaded + builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString()) + .withNamespace(UnknownPipelineOptions.class)); + continue; + } - HashSet<PipelineOptionSpec> specs = new HashSet<>(optionsMap.get(jsonOption.getKey())); - if (specs.isEmpty()) { - builder.add(DisplayData.item(jsonOption.getKey(), jsonOption.getValue().toString()) - .withNamespace(UnknownPipelineOptions.class)); - } else { for (PipelineOptionSpec spec : specs) { if (!spec.shouldSerialize()) { continue; } Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod()); - value = value == null ? "" : value; - DisplayData.Type type = DisplayData.inferType(value); - if (type != null) { - builder.add(DisplayData.item(jsonOption.getKey(), type, value) - .withNamespace(spec.getDefiningInterface())); - } else { - builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value)) - .withNamespace(spec.getDefiningInterface())); - } + DisplayDataValue resolved = DisplayDataValue.resolve(value); + builder.add(DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue()) + .withNamespace(spec.getDefiningInterface())); } } } } /** - * {@link Object#toString()} wrapper to extract display data values for various types. + * Helper class to resolve a {@link DisplayData} type and value from {@link PipelineOptions}. */ - private String displayDataString(Object value) { - checkNotNull(value, "value cannot be null"); - if (!value.getClass().isArray()) { - return value.toString(); - } - if (!value.getClass().getComponentType().isPrimitive()) { - return Arrays.deepToString((Object[]) value); + @AutoValue + abstract static class DisplayDataValue { + /** + * The resolved display data value. May differ from the input to {@link #resolve(Object)} + */ + abstract Object getValue(); + + /** The resolved display data type. */ + abstract DisplayData.Type getType(); + + /** + * Infer the value and {@link DisplayData.Type type} for the given + * {@link PipelineOptions} value. + */ + static DisplayDataValue resolve(@Nullable Object value) { + DisplayData.Type type = DisplayData.inferType(value); + + if (type == null) { + value = displayDataString(value); + type = DisplayData.Type.STRING; + } + + return new AutoValue_ProxyInvocationHandler_DisplayDataValue(value, type); } - // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an - // Object array, but will unwrap nested primitive arrays. - String wrapped = Arrays.deepToString(new Object[] {value}); - return wrapped.substring(1, wrapped.length() - 1); + /** + * Safe {@link Object#toString()} wrapper to extract display data values for various types. + */ + private static String displayDataString(@Nullable Object value) { + if (value == null) { + return ""; + } + if (!value.getClass().isArray()) { + return value.toString(); + } + if (!value.getClass().getComponentType().isPrimitive()) { + return Arrays.deepToString((Object[]) value); + } + + // At this point, we have some type of primitive array. Arrays.deepToString(..) requires an + // Object array, but will unwrap nested primitive arrays. + String wrapped = Arrays.deepToString(new Object[]{value}); + return wrapped.substring(1, wrapped.length() - 1); + } } /** @@ -587,7 +614,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { List<Map<String, Object>> serializedDisplayData = Lists.newArrayList(); DisplayData displayData = DisplayData.from(value); - for (DisplayData.Item<?> item : displayData.items()) { + for (DisplayData.Item item : displayData.items()) { @SuppressWarnings("unchecked") Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class); serializedDisplayData.add(serializedItem); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index df9a306..7719c73 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -124,14 +124,14 @@ public class Combine { return globally(fn, displayDataForFn(fn)); } - private static <T> DisplayData.Item<? extends Class<?>> displayDataForFn(T fn) { + private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) { return DisplayData.item("combineFn", fn.getClass()) .withLabel("Combiner"); } private static <InputT, OutputT> Globally<InputT, OutputT> globally( GlobalCombineFn<? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new Globally<>(fn, fnDisplayData, true, 0); } @@ -200,7 +200,7 @@ public class Combine { private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/); } @@ -210,7 +210,7 @@ public class Combine { */ private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/); } @@ -294,7 +294,7 @@ public class Combine { private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new GroupedValues<>(fn, fnDisplayData); } @@ -521,7 +521,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(CombineFn.this); + builder.delegate(CombineFn.this); } }; } @@ -1258,7 +1258,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(KeyedCombineFn.this); + builder.delegate(KeyedCombineFn.this); } }; } @@ -1325,13 +1325,13 @@ public class Combine { extends PTransform<PCollection<InputT>, PCollection<OutputT>> { private final GlobalCombineFn<? super InputT, ?, OutputT> fn; - private final DisplayData.Item<? extends Class<?>> fnDisplayData; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final boolean insertDefault; private final int fanout; private final List<PCollectionView<?>> sideInputs; private Globally(GlobalCombineFn<? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.insertDefault = insertDefault; @@ -1340,7 +1340,7 @@ public class Combine { } private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { super(name); this.fn = fn; this.fnDisplayData = fnDisplayData; @@ -1350,7 +1350,7 @@ public class Combine { } private Globally(String name, GlobalCombineFn<? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout, List<PCollectionView<?>> sideInputs) { super(name); this.fn = fn; @@ -1498,9 +1498,9 @@ public class Combine { private static void populateDisplayData( DisplayData.Builder builder, HasDisplayData fn, - DisplayData.Item<? extends Class<?>> fnDisplayItem) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayItem) { builder - .include(fn) + .include("combineFn", fn) .add(fnDisplayItem); } @@ -1556,13 +1556,13 @@ public class Combine { extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { private final GlobalCombineFn<? super InputT, ?, OutputT> fn; - private final DisplayData.Item<? extends Class<?>> fnDisplayData; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final boolean insertDefault; private final int fanout; private GloballyAsSingletonView( GlobalCombineFn<? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.insertDefault = insertDefault; @@ -1762,13 +1762,13 @@ public class Combine { extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> { private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; - private final DisplayData.Item<? extends Class<?>> fnDisplayData; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final boolean fewKeys; private final List<PCollectionView<?>> sideInputs; private PerKey( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) { this.fn = fn; this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; @@ -1777,7 +1777,7 @@ public class Combine { private PerKey(String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys, List<PCollectionView<?>> sideInputs) { super(name); this.fn = fn; @@ -1788,7 +1788,7 @@ public class Combine { private PerKey( String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, boolean fewKeys) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) { super(name); this.fn = fn; this.fnDisplayData = fnDisplayData; @@ -1888,12 +1888,12 @@ public class Combine { extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> { private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; - private final DisplayData.Item<? extends Class<?>> fnDisplayData; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final SerializableFunction<? super K, Integer> hotKeyFanout; private PerKeyWithHotKeyFanout(String name, PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, SerializableFunction<? super K, Integer> hotKeyFanout) { super(name); this.fn = fn; @@ -1976,7 +1976,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -2024,7 +2024,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; } else { @@ -2068,7 +2068,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; postCombine = @@ -2117,7 +2117,7 @@ public class Combine { } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(PerKeyWithHotKeyFanout.this); + builder.delegate(PerKeyWithHotKeyFanout.this); } }; } @@ -2202,7 +2202,7 @@ public class Combine { Combine.populateDisplayData(builder, fn, fnDisplayData); if (hotKeyFanout instanceof HasDisplayData) { - builder.include((HasDisplayData) hotKeyFanout); + builder.include("hotKeyFanout", (HasDisplayData) hotKeyFanout); } builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) .withLabel("Fanout Function")); @@ -2349,12 +2349,12 @@ public class Combine { PCollection<KV<K, OutputT>>> { private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; - private final DisplayData.Item<? extends Class<?>> fnDisplayData; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final List<PCollectionView<?>> sideInputs; private GroupedValues( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; this.sideInputs = ImmutableList.of(); @@ -2362,7 +2362,7 @@ public class Combine { private GroupedValues( PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, - DisplayData.Item<? extends Class<?>> fnDisplayData, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, List<PCollectionView<?>> sideInputs) { this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; @@ -2402,7 +2402,7 @@ public class Combine { @Override public void populateDisplayData(DisplayData.Builder builder) { - Combine.GroupedValues.this.populateDisplayData(builder); + builder.delegate(Combine.GroupedValues.this); } }).withSideInputs(sideInputs)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index 229b1d2..1b3e525 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -21,18 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1044,35 +1040,12 @@ public class CombineFns { */ private static void populateDisplayData( DisplayData.Builder builder, List<? extends HasDisplayData> combineFns) { - - // NB: ArrayListMultimap necessary to maintain ordering of combineFns of the same type. - Multimap<Class<?>, HasDisplayData> combineFnMap = ArrayListMultimap.create(); - for (int i = 0; i < combineFns.size(); i++) { HasDisplayData combineFn = combineFns.get(i); - builder.add(DisplayData.item("combineFn" + (i + 1), combineFn.getClass()) + String token = "combineFn" + (i + 1); + builder.add(DisplayData.item(token, combineFn.getClass()) .withLabel("Combine Function")); - combineFnMap.put(combineFn.getClass(), combineFn); - } - - for (Map.Entry<Class<?>, Collection<HasDisplayData>> combineFnEntries : - combineFnMap.asMap().entrySet()) { - - Collection<HasDisplayData> classCombineFns = combineFnEntries.getValue(); - if (classCombineFns.size() == 1) { - // Only one combineFn of this type, include it directly. - builder.include(Iterables.getOnlyElement(classCombineFns)); - - } else { - // Multiple combineFns of same type, add a namespace suffix so display data is - // unique and ordered. - String baseNamespace = combineFnEntries.getKey().getName(); - for (int i = 0; i < combineFns.size(); i++) { - HasDisplayData combineFn = combineFns.get(i); - String namespace = String.format("%s#%d", baseNamespace, i + 1); - builder.include(combineFn, namespace); - } - } + builder.include(token, combineFn); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index 3dd4fe2..7ac952c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -171,8 +171,7 @@ public class CombineWithContext { @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - CombineFnWithContext.this.populateDisplayData(builder); + builder.delegate(CombineFnWithContext.this); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 12d4824..18d9333 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -246,7 +246,7 @@ public class DoFnAdapters { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.include(fn); + builder.delegate(fn); } private void readObject(java.io.ObjectInputStream in) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index b590d45..4ef809f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -119,7 +119,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { ////////////////////////////////////////////////////////////////////////////////////////////////// private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn; - private final DisplayData.Item<?> fnClassDisplayData; + private final DisplayData.ItemSpec<?> fnClassDisplayData; private FlatMapElements( SimpleFunction<InputT, ? extends Iterable<OutputT>> fn, @@ -166,7 +166,9 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(fn).add(fnClassDisplayData); + builder + .include("flatMapFn", fn) + .add(fnClassDisplayData); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 73e4359..c109034 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -103,7 +103,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { /////////////////////////////////////////////////////////////////// private final SimpleFunction<InputT, OutputT> fn; - private final DisplayData.Item<?> fnClassDisplayData; + private final DisplayData.ItemSpec<?> fnClassDisplayData; private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) { this.fn = fn; @@ -123,7 +123,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { @Override public void populateDisplayData(DisplayData.Builder builder) { - MapElements.this.populateDisplayData(builder); + builder.delegate(MapElements.this); } @Override @@ -141,6 +141,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(fn).add(fnClassDisplayData); + builder + .include("mapFn", fn) + .add(fnClassDisplayData); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 8aa87e4..93eb1ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -521,7 +521,7 @@ public class ParDo { */ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -538,12 +538,17 @@ public class ParDo { */ @Deprecated public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } private static <InputT, OutputT> Bound<InputT, OutputT> of( - OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { - return new Unbound().of(fn, fnClass); + OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { + return new Unbound().of(fn, fnDisplayData); + } + + private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) { + return DisplayData.item("fn", fn.getClass()) + .withLabel("Transform Function"); } /** @@ -666,7 +671,7 @@ public class ParDo { */ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -681,12 +686,12 @@ public class ParDo { */ @Deprecated public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } private <InputT, OutputT> Bound<InputT, OutputT> of( - OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { - return new Bound<>(name, sideInputs, fn, fnClass); + OldDoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { + return new Bound<>(name, sideInputs, fn, fnDisplayData); } } @@ -707,16 +712,16 @@ public class ParDo { // Inherits name. private final List<PCollectionView<?>> sideInputs; private final OldDoFn<InputT, OutputT> fn; - private final Class<?> fnClass; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; Bound(String name, List<PCollectionView<?>> sideInputs, OldDoFn<InputT, OutputT> fn, - Class<?> fnClass) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { super(name); this.sideInputs = sideInputs; this.fn = SerializableUtils.clone(fn); - this.fnClass = fnClass; + this.fnDisplayData = fnDisplayData; } /** @@ -744,7 +749,7 @@ public class ParDo { ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder(); builder.addAll(this.sideInputs); builder.addAll(sideInputs); - return new Bound<>(name, builder.build(), fn, fnClass); + return new Bound<>(name, builder.build(), fn, fnDisplayData); } /** @@ -758,7 +763,7 @@ public class ParDo { public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData); } @Override @@ -802,7 +807,7 @@ public class ParDo { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - ParDo.populateDisplayData(builder, fn, fnClass); + ParDo.populateDisplayData(builder, fn, fnDisplayData); } public OldDoFn<InputT, OutputT> getFn() { @@ -883,7 +888,7 @@ public class ParDo { */ public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); - return of(adapt(fn), fn.getClass()); + return of(adapt(fn), displayDataForFn(fn)); } /** @@ -898,12 +903,13 @@ public class ParDo { */ @Deprecated public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { - return of(fn, fn.getClass()); + return of(fn, displayDataForFn(fn)); } - private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) { + private <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnDisplayData); } } @@ -925,20 +931,20 @@ public class ParDo { private final TupleTag<OutputT> mainOutputTag; private final TupleTagList sideOutputTags; private final OldDoFn<InputT, OutputT> fn; - private final Class<?> fnClass; + private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; BoundMulti(String name, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, OldDoFn<InputT, OutputT> fn, - Class<?> fnClass) { + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { super(name); this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.fn = SerializableUtils.clone(fn); - this.fnClass = fnClass; + this.fnDisplayData = fnDisplayData; } /** @@ -969,7 +975,7 @@ public class ParDo { builder.addAll(sideInputs); return new BoundMulti<>( name, builder.build(), - mainOutputTag, sideOutputTags, fn, fnClass); + mainOutputTag, sideOutputTags, fn, fnDisplayData); } @@ -1023,7 +1029,7 @@ public class ParDo { @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - ParDo.populateDisplayData(builder, fn, fnClass); + ParDo.populateDisplayData(builder, fn, fnDisplayData); } public OldDoFn<InputT, OutputT> getFn() { @@ -1044,11 +1050,11 @@ public class ParDo { } private static void populateDisplayData( - DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) { + DisplayData.Builder builder, OldDoFn<?, ?> fn, + DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { builder - .include(fn) - .add(DisplayData.item("fn", fnClass) - .withLabel("Transform Function")); + .include("fn", fn) + .add(fnDisplayData); } private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 9247942..5b4eead 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -124,7 +124,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.include(partitionDoFn); + builder.include("partitionFn", partitionDoFn); } private final transient PartitionDoFn<T> partitionDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 394666b..5ab6342 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -20,15 +20,19 @@ package org.apache.beam.sdk.transforms.display; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import autovalue.shaded.com.google.common.common.base.Joiner; import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonValue; import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -48,12 +52,12 @@ import org.joda.time.format.ISODateTimeFormat; * interface. */ public class DisplayData implements Serializable { - private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item<?>>newHashMap()); + private static final DisplayData EMPTY = new DisplayData(Maps.<Identifier, Item>newHashMap()); private static final DateTimeFormatter TIMESTAMP_FORMATTER = ISODateTimeFormat.dateTime(); - private final ImmutableMap<Identifier, Item<?>> entries; + private final ImmutableMap<Identifier, Item> entries; - private DisplayData(Map<Identifier, Item<?>> entries) { + private DisplayData(Map<Identifier, Item> entries) { this.entries = ImmutableMap.copyOf(entries); } @@ -71,7 +75,11 @@ public class DisplayData implements Serializable { */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); - return InternalBuilder.forRoot(component).build(); + + InternalBuilder builder = new InternalBuilder(); + builder.include(Path.root(), component); + + return builder.build(); } /** @@ -99,11 +107,11 @@ public class DisplayData implements Serializable { } @JsonValue - public Collection<Item<?>> items() { + public Collection<Item> items() { return entries.values(); } - public Map<Identifier, Item<?>> asMap() { + public Map<Identifier, Item> asMap() { return entries; } @@ -126,7 +134,7 @@ public class DisplayData implements Serializable { public String toString() { StringBuilder builder = new StringBuilder(); boolean isFirstLine = true; - for (Item<?> entry : entries.values()) { + for (Item entry : entries.values()) { if (isFirstLine) { isFirstLine = false; } else { @@ -149,70 +157,81 @@ public class DisplayData implements Serializable { */ public interface Builder { /** - * Register display data from the specified subcomponent. For example, a {@link PTransform} - * which delegates to a user-provided function can implement {@link HasDisplayData} on the - * function and include it from the {@link PTransform}: + * Register display data from the specified subcomponent at the given path. For example, a + * {@link PTransform} which delegates to a user-provided function can implement + * {@link HasDisplayData} on the function and include it from the {@link PTransform}: * * <pre><code>{@literal @Override} * public void populateDisplayData(DisplayData.Builder builder) { * super.populateDisplayData(builder); * * builder - * .add(DisplayData.item("userFn", userFn)) // To register the class name of the userFn - * .include(userFn); // To allow the userFn to register additional display data + * // To register the class name of the userFn + * .add(DisplayData.item("userFn", userFn.getClass())) + * // To allow the userFn to register additional display data + * .include("userFn", userFn); * } * </code></pre> * - * <p>Using {@code include(subcomponent)} will associate each of the registered items with the - * namespace of the {@code subcomponent} being registered. To register display data in the - * current namespace, such as from a base class implementation, use + * <p>Using {@code include(path, subcomponent)} will associate each of the registered items with + * the namespace of the {@code subcomponent} being registered, with the specified path element + * relative to the current path. To register display data in the current path and namespace, + * such as from a base class implementation, use * {@code subcomponent.populateDisplayData(builder)} instead. * * @see HasDisplayData#populateDisplayData(DisplayData.Builder) */ - Builder include(HasDisplayData subComponent); + Builder include(String path, HasDisplayData subComponent); /** - * Register display data from the specified subcomponent, overriding the namespace of - * subcomponent display items with the specified namespace. + * Register display data from the specified component on behalf of the current component. + * Display data items will be added with the subcomponent namespace but the current component + * path. * - * @see #include(HasDisplayData) - */ - Builder include(HasDisplayData subComponent, Class<?> namespace); - - /** - * Register display data from the specified subcomponent, overriding the namespace of - * subcomponent display items with the specified namespace. + * <p>This is useful for components which simply wrap other components and wish to retain the + * display data from the wrapped component. Such components should implement + * {@code populateDisplayData} as: * - * @see #include(HasDisplayData) + * <pre><code>{@literal @Override} + * public void populateDisplayData(DisplayData.Builder builder) { + * builder.delegate(wrapped); + * } + * </code></pre> */ - Builder include(HasDisplayData subComponent, String namespace); + Builder delegate(HasDisplayData component); /** * Register the given display item. */ - Builder add(Item<?> item); + Builder add(ItemSpec<?> item); /** * Register the given display item if the value is not null. */ - Builder addIfNotNull(Item<?> item); + Builder addIfNotNull(ItemSpec<?> item); /** * Register the given display item if the value is different than the specified default. */ - <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue); + <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue); } /** - * {@link Item Items} are the unit of display data. Each item is identified by a given key + * {@link Item Items} are the unit of display data. Each item is identified by a given path, key, * and namespace from the component the display item belongs to. * * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add} * within {@link HasDisplayData#populateDisplayData} implementations. */ @AutoValue - public abstract static class Item<T> implements Serializable { + public abstract static class Item { + + /** + * The path for the display item within a component hierarchy. + */ + @Nullable + @JsonIgnore + public abstract Path getPath(); /** * The namespace for the display item. The namespace defaults to the component which @@ -220,7 +239,7 @@ public class DisplayData implements Serializable { */ @Nullable @JsonGetter("namespace") - public abstract String getNamespace(); + public abstract Class<?> getNamespace(); /** * The key for the display item. Each display item is created with a key and value @@ -240,11 +259,8 @@ public class DisplayData implements Serializable { * Retrieve the value of the display item. The value is translated from the input to * {@link DisplayData#item} into a format suitable for display. Translation is based on the * item's {@link #getType() type}. - * - * <p>The value will only be {@literal null} if the input value during creation was null. */ @JsonGetter("value") - @Nullable public abstract Object getValue(); /** @@ -285,27 +301,104 @@ public class DisplayData implements Serializable { @Nullable public abstract String getLinkUrl(); - private static <T> Item<T> create(String key, Type type, @Nullable T value) { - FormattedItemValue formatted = type.safeFormat(value); - return of(null, key, type, formatted.getLongValue(), formatted.getShortValue(), null, null); + private static Item create(ItemSpec<?> spec, Path path) { + checkNotNull(spec, "spec cannot be null"); + checkNotNull(path, "path cannot be null"); + Class<?> ns = checkNotNull(spec.getNamespace(), "namespace must be set"); + + return new AutoValue_DisplayData_Item(path, ns, spec.getKey(), spec.getType(), + spec.getValue(), spec.getShortValue(), spec.getLabel(), spec.getLinkUrl()); } + @Override + public String toString() { + return String.format("%s%s:%s=%s", getPath(), getNamespace().getName(), getKey(), getValue()); + } + } + + /** + * Specifies an {@link Item} to register as display data. Each item is identified by a given + * path, key, and namespace from the component the display item belongs to. + * + * <p>{@link Item Items} are registered via {@link DisplayData.Builder#add} + * within {@link HasDisplayData#populateDisplayData} implementations. + */ + @AutoValue + public abstract static class ItemSpec<T> implements Serializable { + /** + * The namespace for the display item. If unset, defaults to the component which + * the display item is registered to. + */ + @Nullable + public abstract Class<?> getNamespace(); + + /** + * The key for the display item. Each display item is created with a key and value + * via {@link DisplayData#item}. + */ + public abstract String getKey(); + + /** + * The {@link DisplayData.Type} of display data. All display data conforms to a predefined set + * of allowed types. + */ + public abstract Type getType(); + + /** + * The value of the display item. The value is translated from the input to + * {@link DisplayData#item} into a format suitable for display. Translation is based on the + * item's {@link #getType() type}. + */ + @Nullable + public abstract Object getValue(); + /** - * Set the item {@link Item#getNamespace() namespace} from the given {@link Class}. + * The optional short value for an item, or {@code null} if none is provided. * - * <p>This method does not alter the current instance, but instead returns a new {@link Item} - * with the namespace set. + * <p>The short value is an alternative display representation for items having a long display + * value. For example, the {@link #getValue() value} for {@link Type#JAVA_CLASS} items contains + * the full class name with package, while the short value contains just the class name. + * + * <p>A {@link #getValue() value} will be provided for each display item, and some types may + * also provide a short-value. If a short value is provided, display data consumers may + * choose to display it instead of or in addition to the {@link #getValue() value}. */ - public Item<T> withNamespace(Class<?> namespace) { - checkNotNull(namespace, "namespace argument cannot be null"); - return withNamespace(namespaceOf(namespace)); + @Nullable + public abstract Object getShortValue(); + + /** + * The optional label for an item. The label is a human-readable description of what + * the metadata represents. UIs may choose to display the label instead of the item key. + */ + @Nullable + public abstract String getLabel(); + + /** + * The optional link URL for an item. The URL points to an address where the reader + * can find additional context for the display data. + */ + @Nullable + public abstract String getLinkUrl(); + + private static <T> ItemSpec<T> create(String key, Type type, @Nullable T value) { + return ItemSpec.<T>builder() + .setKey(key) + .setType(type) + .setRawValue(value) + .build(); } - /** @see #withNamespace(Class) */ - public Item<T> withNamespace(String namespace) { + /** + * Set the item {@link ItemSpec#getNamespace() namespace} from the given {@link Class}. + * + * <p>This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the namespace set. + */ + public ItemSpec<T> withNamespace(Class<?> namespace) { checkNotNull(namespace, "namespace argument cannot be null"); - return of( - namespace, getKey(), getType(), getValue(), getShortValue(), getLabel(), getLinkUrl()); + return toBuilder() + .setNamespace(namespace) + .build(); } /** @@ -313,12 +406,13 @@ public class DisplayData implements Serializable { * * <p>Specifying a null value will clear the label if it was previously defined. * - * <p>This method does not alter the current instance, but instead returns a new {@link Item} - * with the label set. + * <p>This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the label set. */ - public Item<T> withLabel(String label) { - return of( - getNamespace(), getKey(), getType(), getValue(), getShortValue(), label, getLinkUrl()); + public ItemSpec<T> withLabel(@Nullable String label) { + return toBuilder() + .setLabel(label) + .build(); } /** @@ -326,11 +420,13 @@ public class DisplayData implements Serializable { * * <p>Specifying a null value will clear the link url if it was previously defined. * - * <p>This method does not alter the current instance, but instead returns a new {@link Item} - * with the link url set. + * <p>This method does not alter the current instance, but instead returns a new + * {@link ItemSpec} with the link url set. */ - public Item<T> withLinkUrl(String url) { - return of(getNamespace(), getKey(), getType(), getValue(), getShortValue(), getLabel(), url); + public ItemSpec<T> withLinkUrl(@Nullable String url) { + return toBuilder() + .setLinkUrl(url) + .build(); } /** @@ -339,84 +435,166 @@ public class DisplayData implements Serializable { * <p>This should only be used internally. It is useful to compare the value of a * {@link DisplayData.Item} to the value derived from a specified input. */ - private Item<T> withValue(Object value) { - FormattedItemValue formatted = getType().safeFormat(value); - return of(getNamespace(), getKey(), getType(), formatted.getLongValue(), - formatted.getShortValue(), getLabel(), getLinkUrl()); - } - - private static <T> Item<T> of( - @Nullable String namespace, - String key, - Type type, - @Nullable Object value, - @Nullable Object shortValue, - @Nullable String label, - @Nullable String linkUrl) { - return new AutoValue_DisplayData_Item<>( - namespace, key, type, value, shortValue, label, linkUrl); + private ItemSpec<T> withValue(T value) { + return toBuilder() + .setRawValue(value) + .build(); } @Override public String toString() { return String.format("%s:%s=%s", getNamespace(), getKey(), getValue()); } + + static <T> ItemSpec.Builder<T> builder() { + return new AutoValue_DisplayData_ItemSpec.Builder<>(); + } + + abstract ItemSpec.Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + public abstract ItemSpec.Builder<T> setKey(String key); + public abstract ItemSpec.Builder<T> setNamespace(@Nullable Class<?> namespace); + public abstract ItemSpec.Builder<T> setType(Type type); + public abstract ItemSpec.Builder<T> setValue(@Nullable Object longValue); + public abstract ItemSpec.Builder<T> setShortValue(@Nullable Object shortValue); + public abstract ItemSpec.Builder<T> setLabel(@Nullable String label); + public abstract ItemSpec.Builder<T> setLinkUrl(@Nullable String url); + public abstract ItemSpec<T> build(); + + + abstract Type getType(); + + ItemSpec.Builder<T> setRawValue(@Nullable T value) { + FormattedItemValue formatted = getType().safeFormat(value); + return this + .setValue(formatted.getLongValue()) + .setShortValue(formatted.getShortValue()); + } + } } /** * Unique identifier for a display data item within a component. - * Identifiers are composed of the key they are registered with and a namespace generated from - * the class of the component which registered the item. + * + * <p>Identifiers are composed of: + * + * <ul> + * <li>A {@link #getPath() path} based on the component hierarchy</li> + * <li>The {@link #getKey() key} it is registered with</li> + * <li>A {@link #getNamespace() namespace} generated from the class of the component which + * registered the item.</li> + * </ul> * * <p>Display data registered with the same key from different components will have different * namespaces and thus will both be represented in the composed {@link DisplayData}. If a * single component registers multiple metadata items with the same key, only the most recent * item will be retained; previous versions are discarded. */ - public static class Identifier { - private final String ns; - private final String key; + @AutoValue + public abstract static class Identifier { + public abstract Path getPath(); + public abstract Class<?> getNamespace(); + public abstract String getKey(); - public static Identifier of(Class<?> namespace, String key) { - return of(namespaceOf(namespace), key); + public static Identifier of(Path path, Class<?> namespace, String key) { + return new AutoValue_DisplayData_Identifier(path, namespace, key); } - public static Identifier of(String namespace, String key) { - return new Identifier(namespace, key); + @Override + public String toString() { + return String.format("%s%s:%s", getPath(), getNamespace(), getKey()); } + } - private Identifier(String ns, String key) { - this.ns = ns; - this.key = key; + /** + * Structured path of registered display data within a component hierarchy. + * + * <p>Display data items registered directly by a component will have the {@link Path#root() root} + * path. If the component {@link Builder#include includes} a sub-component, its display data will + * be registered at the path specified. Each sub-component path is created by appending a child + * element to the path of its parent component, forming a hierarchy. + */ + public static class Path { + private final ImmutableList<String> components; + private Path(ImmutableList<String> components) { + this.components = components; } - public String getNamespace() { - return ns; + /** + * Path for display data registered by a top-level component. + */ + public static Path root() { + return new Path(ImmutableList.<String>of()); } - public String getKey() { - return key; + /** + * Construct a path from an absolute component path hierarchy. + * + * <p>For the root path, use {@link Path#root()}. + * + * @param firstPath Path of the first sub-component. + * @param paths Additional path components. + */ + public static Path absolute(String firstPath, String... paths) { + ImmutableList.Builder<String> builder = ImmutableList.builder(); + + validatePathElement(firstPath); + builder.add(firstPath); + for (String path : paths) { + validatePathElement(path); + builder.add(path); + } + + return new Path(builder.build()); } - @Override - public boolean equals(Object obj) { - if (obj instanceof Identifier) { - Identifier that = (Identifier) obj; - return Objects.equals(this.ns, that.ns) - && Objects.equals(this.key, that.key); - } + /** + * Hierarchy list of component paths making up the full path, starting with the top-level child + * component path. For the {@link #root root} path, returns the empty list. + */ + public List<String> getComponents() { + return components; + } - return false; + /** + * Extend the path by appending a sub-component path. The new path element is added to the end + * of the path hierarchy. + * + * <p>Returns a new {@link Path} instance; the originating {@link Path} is not modified. + */ + public Path extend(String path) { + validatePathElement(path); + return new Path(ImmutableList.<String>builder() + .addAll(components.iterator()) + .add(path) + .build()); } - @Override - public int hashCode() { - return Objects.hash(ns, key); + private static void validatePathElement(String path) { + checkNotNull(path); + checkArgument(!"".equals(path), "path cannot be empty"); } @Override public String toString() { - return String.format("%s:%s", ns, key); + StringBuilder b = new StringBuilder().append("["); + Joiner.on("/").appendTo(b, components); + b.append("]"); + return b.toString(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Path + && Objects.equals(components, ((Path) obj).components); + + } + + @Override + public int hashCode() { + return components.hashCode(); } } @@ -551,65 +729,79 @@ public class DisplayData implements Serializable { Object getLongValue() { return this.longValue; } - Object getShortValue() { return this.shortValue; } } private static class InternalBuilder implements Builder { - private final Map<Identifier, Item<?>> entries; - private final Set<Object> visited; + private final Map<Identifier, Item> entries; + private final Set<HasDisplayData> visitedComponents; + private final Map<Path, HasDisplayData> visitedPathMap; - private String latestNs; + private Path latestPath; + private Class<?> latestNs; private InternalBuilder() { this.entries = Maps.newHashMap(); - this.visited = Sets.newIdentityHashSet(); - } - - private static InternalBuilder forRoot(HasDisplayData instance) { - InternalBuilder builder = new InternalBuilder(); - builder.include(instance); - return builder; + this.visitedComponents = Sets.newIdentityHashSet(); + this.visitedPathMap = Maps.newHashMap(); } @Override - public Builder include(HasDisplayData subComponent) { + public Builder include(String path, HasDisplayData subComponent) { checkNotNull(subComponent, "subComponent argument cannot be null"); - return include(subComponent, subComponent.getClass()); + checkNotNull(path, "path argument cannot be null"); + + Path absolutePath = latestPath.extend(path); + + HasDisplayData existingComponent = visitedPathMap.get(absolutePath); + if (existingComponent != null) { + throw new IllegalArgumentException(String.format("Specified path '%s' already used for " + + "subcomponent %s. Subcomponents must be included using unique paths.", + path, existingComponent)); + } + + return include(absolutePath, subComponent); } @Override - public Builder include(HasDisplayData subComponent, Class<?> namespace) { - checkNotNull(namespace, "Input namespace override cannot be null"); - return include(subComponent, namespaceOf(namespace)); + public Builder delegate(HasDisplayData component) { + checkNotNull(component); + + return include(latestPath, component); } - @Override - public Builder include(HasDisplayData subComponent, String namespace) { - checkNotNull(subComponent, "subComponent argument cannot be null"); - checkNotNull(namespace, "Input namespace override cannot be null"); - - boolean newComponent = visited.add(subComponent); - if (newComponent) { - String prevNs = this.latestNs; - this.latestNs = namespace; - - try { - subComponent.populateDisplayData(this); - } catch (PopulateDisplayDataException e) { - // Don't re-wrap exceptions recursively. - throw e; - } catch (Throwable e) { - String msg = String.format("Error while populating display data for component: %s", - namespace); - throw new PopulateDisplayDataException(msg, e); - } + private Builder include(Path path, HasDisplayData subComponent) { + if (visitedComponents.contains(subComponent)) { + // Component previously registered; ignore in order to break cyclic dependencies + return this; + } - this.latestNs = prevNs; + // New component; add it. + visitedComponents.add(subComponent); + visitedPathMap.put(path, subComponent); + Class<?> namespace = subComponent.getClass(); + + Path prevPath = latestPath; + Class<?> prevNs = latestNs; + latestPath = path; + latestNs = namespace; + + try { + subComponent.populateDisplayData(this); + } catch (PopulateDisplayDataException e) { + // Don't re-wrap exceptions recursively. + throw e; + } catch (Throwable e) { + String msg = String.format("Error while populating display data for component: %s", + namespace.getName()); + throw new PopulateDisplayDataException(msg, e); } + latestPath = prevPath; + latestNs = prevNs; + return this; } @@ -623,39 +815,41 @@ public class DisplayData implements Serializable { } @Override - public Builder add(Item<?> item) { + public Builder add(ItemSpec<?> item) { checkNotNull(item, "Input display item cannot be null"); return addItemIf(true, item); } @Override - public Builder addIfNotNull(Item<?> item) { + public Builder addIfNotNull(ItemSpec<?> item) { checkNotNull(item, "Input display item cannot be null"); return addItemIf(item.getValue() != null, item); } @Override - public <T> Builder addIfNotDefault(Item<T> item, @Nullable T defaultValue) { + public <T> Builder addIfNotDefault(ItemSpec<T> item, @Nullable T defaultValue) { checkNotNull(item, "Input display item cannot be null"); - Item<T> defaultItem = item.withValue(defaultValue); + ItemSpec<T> defaultItem = item.withValue(defaultValue); return addItemIf(!Objects.equals(item, defaultItem), item); } - private Builder addItemIf(boolean condition, Item<?> item) { + private Builder addItemIf(boolean condition, ItemSpec<?> spec) { if (!condition) { return this; } - checkNotNull(item, "Input display item cannot be null"); - checkNotNull(item.getValue(), "Input display value cannot be null"); - if (item.getNamespace() == null) { - item = item.withNamespace(latestNs); + checkNotNull(spec, "Input display item cannot be null"); + checkNotNull(spec.getValue(), "Input display value cannot be null"); + + if (spec.getNamespace() == null) { + spec = spec.withNamespace(latestNs); } + Item item = Item.create(spec, latestPath); - Identifier id = Identifier.of(item.getNamespace(), item.getKey()); + Identifier id = Identifier.of(item.getPath(), item.getNamespace(), item.getKey()); checkArgument(!entries.containsKey(id), - "Display data key (%s) is not unique within the specified namespace (%s).", - item.getKey(), item.getNamespace()); + "Display data key (%s) is not unique within the specified path and namespace: %s%s.", + item.getKey(), item.getPath(), item.getNamespace()); entries.put(id, item); return this; @@ -669,63 +863,63 @@ public class DisplayData implements Serializable { /** * Create a display item for the specified key and string value. */ - public static Item<String> item(String key, @Nullable String value) { + public static ItemSpec<String> item(String key, @Nullable String value) { return item(key, Type.STRING, value); } /** * Create a display item for the specified key and integer value. */ - public static Item<Integer> item(String key, @Nullable Integer value) { + public static ItemSpec<Integer> item(String key, @Nullable Integer value) { return item(key, Type.INTEGER, value); } /** * Create a display item for the specified key and integer value. */ - public static Item<Long> item(String key, @Nullable Long value) { + public static ItemSpec<Long> item(String key, @Nullable Long value) { return item(key, Type.INTEGER, value); } /** * Create a display item for the specified key and floating point value. */ - public static Item<Float> item(String key, @Nullable Float value) { + public static ItemSpec<Float> item(String key, @Nullable Float value) { return item(key, Type.FLOAT, value); } /** * Create a display item for the specified key and floating point value. */ - public static Item<Double> item(String key, @Nullable Double value) { + public static ItemSpec<Double> item(String key, @Nullable Double value) { return item(key, Type.FLOAT, value); } /** * Create a display item for the specified key and boolean value. */ - public static Item<Boolean> item(String key, @Nullable Boolean value) { + public static ItemSpec<Boolean> item(String key, @Nullable Boolean value) { return item(key, Type.BOOLEAN, value); } /** * Create a display item for the specified key and timestamp value. */ - public static Item<Instant> item(String key, @Nullable Instant value) { + public static ItemSpec<Instant> item(String key, @Nullable Instant value) { return item(key, Type.TIMESTAMP, value); } /** * Create a display item for the specified key and duration value. */ - public static Item<Duration> item(String key, @Nullable Duration value) { + public static ItemSpec<Duration> item(String key, @Nullable Duration value) { return item(key, Type.DURATION, value); } /** * Create a display item for the specified key and class value. */ - public static <T> Item<Class<T>> item(String key, @Nullable Class<T> value) { + public static <T> ItemSpec<Class<T>> item(String key, @Nullable Class<T> value) { return item(key, Type.JAVA_CLASS, value); } @@ -739,10 +933,10 @@ public class DisplayData implements Serializable { * * @see Type#inferType(Object) */ - public static <T> Item<T> item(String key, Type type, @Nullable T value) { + public static <T> ItemSpec<T> item(String key, Type type, @Nullable T value) { checkNotNull(key, "key argument cannot be null"); checkNotNull(type, "type argument cannot be null"); - return Item.create(key, type, value); + return ItemSpec.create(key, type, value); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad03d07a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 57f7716..684a776 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -578,7 +578,7 @@ public class Window { builder .add(DisplayData.item("windowFn", windowFn.getClass()) .withLabel("Windowing Function")) - .include(windowFn); + .include("windowFn", windowFn); } if (allowedLateness != null) {
