Repository: incubator-beam Updated Branches: refs/heads/master 65db44ce6 -> 5535fc3fd
Remove inheritance from Create.TimestampedValues Previously, Create.TimestampedValues extends Create.Values. This actually resulted in confusing behavior in one runner because Create.Values was overridden using `instanceof` checks, which accidentally pulled in Create.TimestampedValues. Now Create.TimeStampedValues is a simple composite transform. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0e12587 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0e12587 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0e12587 Branch: refs/heads/master Commit: f0e125871d9ca6da9c8597a2216c3b44b9e85345 Parents: dc98211 Author: Kenneth Knowles <[email protected]> Authored: Thu May 19 20:28:36 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 19 20:28:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Create.java | 144 +++++++++++-------- 1 file changed, 81 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0e12587/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 89e9985..0752113 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -263,53 +263,9 @@ public class Create<T> { public Coder<T> getDefaultOutputCoder(PInput input) throws CannotProvideCoderException { if (coder.isPresent()) { return coder.get(); + } else { + return getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), elems); } - // First try to deduce a coder using the types of the elements. - Class<?> elementClazz = Void.class; - for (T elem : elems) { - if (elem == null) { - continue; - } - Class<?> clazz = elem.getClass(); - if (elementClazz.equals(Void.class)) { - elementClazz = clazz; - } else if (!elementClazz.equals(clazz)) { - // Elements are not the same type, require a user-specified coder. - throw new CannotProvideCoderException( - "Cannot provide coder for Create: The elements are not all of the same class."); - } - } - - if (elementClazz.getTypeParameters().length == 0) { - try { - @SuppressWarnings("unchecked") // elementClazz is a wildcard type - Coder<T> coder = (Coder<T>) input.getPipeline().getCoderRegistry() - .getDefaultCoder(TypeDescriptor.of(elementClazz)); - return coder; - } catch (CannotProvideCoderException exc) { - // let the next stage try - } - } - - // If that fails, try to deduce a coder using the elements themselves - Optional<Coder<T>> coder = Optional.absent(); - for (T elem : elems) { - Coder<T> c = input.getPipeline().getCoderRegistry().getDefaultCoder(elem); - if (!coder.isPresent()) { - coder = Optional.of(c); - } else if (!Objects.equals(c, coder.get())) { - throw new CannotProvideCoderException( - "Cannot provide coder for elements of " + Create.class.getSimpleName() + ":" - + " For their common class, no coder could be provided." - + " Based on their values, they do not all default to the same Coder."); - } - } - - if (!coder.isPresent()) { - throw new CannotProvideCoderException("Unable to infer a coder. Please register " - + "a coder for "); - } - return coder.get(); } ///////////////////////////////////////////////////////////////////////////// @@ -468,7 +424,7 @@ public class Create<T> { * A {@code PTransform} that creates a {@code PCollection} whose elements have * associated timestamps. */ - public static class TimestampedValues<T> extends Values<T> { + public static class TimestampedValues<T> extends PTransform<PInput, PCollection<T>>{ /** * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given * {@code Coder<T>} to decode each of the objects into a @@ -482,17 +438,30 @@ public class Create<T> { * <p>Note that for {@link Create.TimestampedValues with no elements}, the {@link VoidCoder} * is used. */ - @Override public TimestampedValues<T> withCoder(Coder<T> coder) { - return new TimestampedValues<>(elems, Optional.<Coder<T>>of(coder)); + return new TimestampedValues<>(timestampedElements, Optional.<Coder<T>>of(coder)); } @Override public PCollection<T> apply(PInput input) { try { - Coder<T> coder = getDefaultOutputCoder(input); + Iterable<T> rawElements = + Iterables.transform( + timestampedElements, + new Function<TimestampedValue<T>, T>() { + @Override + public T apply(TimestampedValue<T> input) { + return input.getValue(); + } + }); + Coder<T> coder; + if (elementCoder.isPresent()) { + coder = elementCoder.get(); + } else { + coder = getDefaultCreateCoder(input.getPipeline().getCoderRegistry(), rawElements); + } PCollection<TimestampedValue<T>> intermediate = Pipeline.applyTransform(input, - Create.of(elems).withCoder(TimestampedValueCoder.of(coder))); + Create.of(timestampedElements).withCoder(TimestampedValueCoder.of(coder))); PCollection<T> output = intermediate.apply(ParDo.of(new ConvertTimestamps<T>())); output.setCoder(coder); @@ -506,18 +475,14 @@ public class Create<T> { ///////////////////////////////////////////////////////////////////////////// /** The timestamped elements of the resulting PCollection. */ - private final transient Iterable<TimestampedValue<T>> elems; - - private TimestampedValues(Iterable<TimestampedValue<T>> elems, - Optional<Coder<T>> coder) { - super( - Iterables.transform(elems, new Function<TimestampedValue<T>, T>() { - @Override - public T apply(TimestampedValue<T> input) { - return input.getValue(); - } - }), coder); - this.elems = elems; + private final transient Iterable<TimestampedValue<T>> timestampedElements; + + private final transient Optional<Coder<T>> elementCoder; + + private TimestampedValues( + Iterable<TimestampedValue<T>> timestampedElements, Optional<Coder<T>> elementCoder) { + this.timestampedElements = timestampedElements; + this.elementCoder = elementCoder; } private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> { @@ -527,4 +492,57 @@ public class Create<T> { } } } + + private static <T> Coder<T> getDefaultCreateCoder(CoderRegistry registry, Iterable<T> elems) + throws CannotProvideCoderException { + // First try to deduce a coder using the types of the elements. + Class<?> elementClazz = Void.class; + for (T elem : elems) { + if (elem == null) { + continue; + } + Class<?> clazz = elem.getClass(); + if (elementClazz.equals(Void.class)) { + elementClazz = clazz; + } else if (!elementClazz.equals(clazz)) { + // Elements are not the same type, require a user-specified coder. + throw new CannotProvideCoderException( + String.format( + "Cannot provide coder for %s: The elements are not all of the same class.", + Create.class.getSimpleName())); + } + } + + if (elementClazz.getTypeParameters().length == 0) { + try { + @SuppressWarnings("unchecked") // elementClazz is a wildcard type + Coder<T> coder = (Coder<T>) registry.getDefaultCoder(TypeDescriptor.of(elementClazz)); + return coder; + } catch (CannotProvideCoderException exc) { + // let the next stage try + } + } + + // If that fails, try to deduce a coder using the elements themselves + Optional<Coder<T>> coder = Optional.absent(); + for (T elem : elems) { + Coder<T> c = registry.getDefaultCoder(elem); + if (!coder.isPresent()) { + coder = Optional.of(c); + } else if (!Objects.equals(c, coder.get())) { + throw new CannotProvideCoderException( + "Cannot provide coder for elements of " + + Create.class.getSimpleName() + + ":" + + " For their common class, no coder could be provided." + + " Based on their values, they do not all default to the same Coder."); + } + } + + if (!coder.isPresent()) { + throw new CannotProvideCoderException( + "Unable to infer a coder. Please register " + "a coder for "); + } + return coder.get(); + } }
