Introduces Contextful
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b908c2e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b908c2e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b908c2e Branch: refs/heads/master Commit: 4b908c2e693fe9ed44fcb6c67a2d82c7da355259 Parents: 7f5753f Author: Eugene Kirpichov <[email protected]> Authored: Mon Sep 25 13:57:04 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Oct 13 18:43:48 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/annotations/Experimental.java | 8 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 11 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 4 - .../java/org/apache/beam/sdk/io/FileIO.java | 6 +- .../apache/beam/sdk/transforms/Contextful.java | 127 +++++++++++++++++++ .../org/apache/beam/sdk/transforms/ParDo.java | 5 +- .../beam/sdk/transforms/Requirements.java | 56 ++++++++ .../org/apache/beam/sdk/transforms/Watch.java | 36 ++++-- .../apache/beam/sdk/values/TypeDescriptors.java | 36 ++++-- .../apache/beam/sdk/transforms/WatchTest.java | 46 ++++++- .../beam/sdk/values/TypeDescriptorsTest.java | 17 ++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +- .../io/gcp/bigquery/DynamicDestinations.java | 4 - 13 files changed, 305 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 80c4613..fecc407 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -94,6 +94,12 @@ public @interface Experimental { CORE_RUNNERS_ONLY, /** Experimental feature related to making the encoded element type available from a Coder. */ - CODER_TYPE_ENCODING + CODER_TYPE_ENCODING, + + /** + * Experimental APIs related to <a href="https://s.apache.org/context-fn">contextful + * closures</a>. + */ + CONTEXTFUL, } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index e2ab980..1474759 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Duration; @@ -724,14 +723,12 @@ public class AvroIO { return explicitCoder; } // If a coder was not specified explicitly, infer it from parse fn. - TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn); - String message = - "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder()."; - checkArgument(descriptor != null, message); try { - return coderRegistry.getCoder(descriptor); + return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn)); } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException(message, e); + throw new IllegalArgumentException( + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().", + e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index ea5129f..9834e6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -319,10 +319,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> DynamicDestinations.class, new TypeVariableExtractor< DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {}); - checkArgument( - descriptor != null, - "Unable to infer a coder for DestinationT, " - + "please specify it explicitly by overriding getDestinationCoder()"); return registry.getCoder(descriptor); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 7df4bde..a244c07 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -346,12 +346,12 @@ public class FileIO { } } - private static class MatchPollFn implements Watch.Growth.PollFn<String, MatchResult.Metadata> { + private static class MatchPollFn extends Watch.Growth.PollFn<String, MatchResult.Metadata> { @Override - public Watch.Growth.PollResult<MatchResult.Metadata> apply(String input, Instant timestamp) + public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c) throws Exception { return Watch.Growth.PollResult.incomplete( - Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata()); + Instant.now(), FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java new file mode 100644 index 0000000..fb732cf --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.values.PCollectionView; + +/** Pair of a bit of user code (a "closure") and the {@link Requirements} needed to run it. */ +@Experimental(Kind.CONTEXTFUL) +public final class Contextful<ClosureT> implements Serializable { + private final ClosureT closure; + private final Requirements requirements; + + private Contextful(ClosureT closure, Requirements requirements) { + this.closure = closure; + this.requirements = requirements; + } + + /** Returns the closure. */ + public ClosureT getClosure() { + return closure; + } + + /** Returns the requirements needed to run the closure. */ + public Requirements getRequirements() { + return requirements; + } + + /** Constructs a pair of the given closure and its requirements. */ + public static <ClosureT> Contextful<ClosureT> of(ClosureT closure, Requirements requirements) { + return new Contextful<>(closure, requirements); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("closure", closure) + .add("requirements", requirements) + .toString(); + } + + /** + * A function from an input to an output that may additionally access {@link Context} when + * computing the result. + */ + public interface Fn<InputT, OutputT> extends Serializable { + /** + * Invokes the function on the given input with the given context. The function may use the + * context only for the capabilities declared in the {@link Contextful#getRequirements} of the + * enclosing {@link Contextful}. + */ + OutputT apply(InputT element, Context c) throws Exception; + + /** An accessor for additional capabilities available in {@link #apply}. */ + abstract class Context { + /** + * Accesses the given side input. The window in which it is accessed is unspecified, depends + * on usage by the enclosing {@link PTransform}, and must be documented by that transform. + */ + public <T> T sideInput(PCollectionView<T> view) { + throw new UnsupportedOperationException(); + } + + /** + * Convenience wrapper for creating a {@link Context} from a {@link DoFn.ProcessContext}, to + * support the common case when a {@link PTransform} is invoking the {@link + * Contextful#getClosure() closure} from inside a {@link DoFn}. + */ + public static <InputT> Context wrapProcessContext(final DoFn<InputT, ?>.ProcessContext c) { + return new ContextFromProcessContext<>(c); + } + + private static class ContextFromProcessContext<InputT> extends Context { + private final DoFn<InputT, ?>.ProcessContext c; + + ContextFromProcessContext(DoFn<InputT, ?>.ProcessContext c) { + this.c = c; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return c.sideInput(view); + } + } + } + } + + /** + * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link + * Requirements}. + */ + public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn( + final SerializableFunction<InputT, OutputT> fn) { + return new Contextful<Fn<InputT, OutputT>>( + new Fn<InputT, OutputT>() { + @Override + public OutputT apply(InputT element, Context c) throws Exception { + return fn.apply(element); + } + }, + Requirements.empty()); + } + + /** Same with {@link #of} but with better type inference behavior for the case of {@link Fn}. */ + public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn( + final Fn<InputT, OutputT> fn, Requirements requirements) { + return of(fn, requirements); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 49343c7..2ad84fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NameUtils; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -589,7 +588,7 @@ public class ParDo { DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { - this.fn = SerializableUtils.clone(fn); + this.fn = fn; this.fnDisplayData = fnDisplayData; this.sideInputs = sideInputs; } @@ -717,7 +716,7 @@ public class ParDo { this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; this.additionalOutputTags = additionalOutputTags; - this.fn = SerializableUtils.clone(fn); + this.fn = fn; this.fnDisplayData = fnDisplayData; } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java new file mode 100644 index 0000000..acc409f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.values.PCollectionView; + +/** Describes the run-time requirements of a {@link Contextful}, such as access to side inputs. */ +@Experimental(Kind.CONTEXTFUL) +public final class Requirements implements Serializable { + private final Collection<PCollectionView<?>> sideInputs; + + private Requirements(Collection<PCollectionView<?>> sideInputs) { + this.sideInputs = sideInputs; + } + + /** The side inputs that this {@link Contextful} needs access to. */ + public Collection<PCollectionView<?>> getSideInputs() { + return sideInputs; + } + + /** Describes the need for access to the given side inputs. */ + public static Requirements requiresSideInputs(Collection<PCollectionView<?>> sideInputs) { + return new Requirements(sideInputs); + } + + /** Like {@link #requiresSideInputs(Collection)}. */ + public static Requirements requiresSideInputs(PCollectionView<?>... sideInputs) { + return requiresSideInputs(Arrays.asList(sideInputs)); + } + + /** Describes an empty set of requirements. */ + public static Requirements empty() { + return new Requirements(Collections.<PCollectionView<?>>emptyList()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index 21f0641..a3c906c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.transforms.Contextful.Fn.Context.wrapProcessContext; import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; @@ -117,13 +118,25 @@ public class Watch { /** Watches the growth of the given poll function. See class documentation for more details. */ public static <InputT, OutputT> Growth<InputT, OutputT> growthOf( - Growth.PollFn<InputT, OutputT> pollFn) { + Contextful<Growth.PollFn<InputT, OutputT>> pollFn) { return new AutoValue_Watch_Growth.Builder<InputT, OutputT>() .setTerminationPerInput(Watch.Growth.<InputT>never()) .setPollFn(pollFn) .build(); } + /** Watches the growth of the given poll function. See class documentation for more details. */ + public static <InputT, OutputT> Growth<InputT, OutputT> growthOf( + Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) { + return growthOf(Contextful.of(pollFn, requirements)); + } + + /** Watches the growth of the given poll function. See class documentation for more details. */ + public static <InputT, OutputT> Growth<InputT, OutputT> growthOf( + Growth.PollFn<InputT, OutputT> pollFn) { + return growthOf(pollFn, Requirements.empty()); + } + /** Implementation of {@link #growthOf}. */ @AutoValue public abstract static class Growth<InputT, OutputT> @@ -202,12 +215,11 @@ public class Watch { } /** - * A function that computes the current set of outputs for the given input (given as a {@link - * TimestampedValue}), in the form of a {@link PollResult}. + * A function that computes the current set of outputs for the given input, in the form of a + * {@link PollResult}. */ - public interface PollFn<InputT, OutputT> extends Serializable { - PollResult<OutputT> apply(InputT input, Instant timestamp) throws Exception; - } + public abstract static class PollFn<InputT, OutputT> + implements Contextful.Fn<InputT, PollResult<OutputT>> {} /** * A strategy for determining whether it is time to stop polling the current input regardless of @@ -536,7 +548,7 @@ public class Watch { } } - abstract PollFn<InputT, OutputT> getPollFn(); + abstract Contextful<PollFn<InputT, OutputT>> getPollFn(); @Nullable abstract Duration getPollInterval(); @@ -551,7 +563,7 @@ public class Watch { @AutoValue.Builder abstract static class Builder<InputT, OutputT> { - abstract Builder<InputT, OutputT> setPollFn(PollFn<InputT, OutputT> pollFn); + abstract Builder<InputT, OutputT> setPollFn(Contextful<PollFn<InputT, OutputT>> pollFn); abstract Builder<InputT, OutputT> setTerminationPerInput( TerminationCondition<InputT, ?> terminationPerInput); @@ -599,7 +611,7 @@ public class Watch { // of the PollFn. TypeDescriptor<OutputT> outputT = TypeDescriptors.extractFromTypeParameters( - getPollFn(), + getPollFn().getClosure(), PollFn.class, new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {}); try { @@ -617,7 +629,8 @@ public class Watch { } return input - .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder))) + .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder)) + .withSideInputs(getPollFn().getRequirements().getSideInputs())) .setCoder(KvCoder.of(input.getCoder(), outputCoder)); } } @@ -638,7 +651,8 @@ public class Watch { throws Exception { if (!tracker.hasPending() && !tracker.currentRestriction().isOutputComplete) { LOG.debug("{} - polling input", c.element()); - Growth.PollResult<OutputT> res = spec.getPollFn().apply(c.element(), c.timestamp()); + Growth.PollResult<OutputT> res = + spec.getPollFn().getClosure().apply(c.element(), wrapProcessContext(c)); // TODO (https://issues.apache.org/jira/browse/BEAM-2680): // Consider truncating the pending outputs if there are too many, to avoid blowing // up the state. In that case, we'd rely on the next poll cycle to provide more outputs. http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java index 8207f06..29a2496 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java @@ -24,6 +24,7 @@ import java.math.BigInteger; import java.util.List; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.Contextful; import org.apache.beam.sdk.transforms.SerializableFunction; /** @@ -325,10 +326,9 @@ public class TypeDescriptors { * @param extractor A class for specifying the type to extract from the supertype * * @return A {@link TypeDescriptor} for the actual value of the result type of the extractor, - * or {@code null} if the type was erased. + * potentially containing unresolved type variables if the type was erased. */ @SuppressWarnings("unchecked") - @Nullable public static <T, V> TypeDescriptor<V> extractFromTypeParameters( T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { return extractFromTypeParameters( @@ -340,7 +340,6 @@ public class TypeDescriptors { * {@link TypeDescriptor} of the instance being analyzed rather than the instance itself. */ @SuppressWarnings("unchecked") - @Nullable public static <T, V> TypeDescriptor<V> extractFromTypeParameters( TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) { // Get the type signature of the extractor, e.g. @@ -363,19 +362,13 @@ public class TypeDescriptors { // Get output of the extractor. Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1]; - TypeDescriptor<?> res = TypeDescriptor.of(outputT); - if (res.hasUnresolvedParameters()) { - return null; - } else { - return (TypeDescriptor<V>) res; - } + return (TypeDescriptor<V>) TypeDescriptor.of(outputT); } /** * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to - * Java type erasure: returns {@code null} if the type was erased. + * Java type erasure: may contain unresolved type variables if the type was erased. */ - @Nullable public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( SerializableFunction<InputT, OutputT> fn) { return extractFromTypeParameters( @@ -386,9 +379,8 @@ public class TypeDescriptors { /** * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to - * Java type erasure: returns {@code null} if the type was erased. + * Java type erasure: may contain unresolved type variables if the type was erased. */ - @Nullable public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( SerializableFunction<InputT, OutputT> fn) { return extractFromTypeParameters( @@ -396,4 +388,22 @@ public class TypeDescriptors { SerializableFunction.class, new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {}); } + + /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + public static <InputT, OutputT> TypeDescriptor<InputT> inputOf( + Contextful.Fn<InputT, OutputT> fn) { + return TypeDescriptors.extractFromTypeParameters( + fn, + Contextful.Fn.class, + new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {}); + } + + /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */ + public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf( + Contextful.Fn<InputT, OutputT> fn) { + return TypeDescriptors.extractFromTypeParameters( + fn, + Contextful.Fn.class, + new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, OutputT>() {}); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java index 132a1ff..113e8fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs; import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput; import static org.apache.beam.sdk.transforms.Watch.Growth.afterTotalOf; import static org.apache.beam.sdk.transforms.Watch.Growth.allOf; @@ -57,6 +58,7 @@ import org.apache.beam.sdk.transforms.Watch.GrowthTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -81,9 +83,10 @@ public class WatchTest implements Serializable { Watch.growthOf( new PollFn<String, String>() { @Override - public PollResult<String> apply(String input, Instant time) { + public PollResult<String> apply(String element, Context c) + throws Exception { return PollResult.complete( - time, Arrays.asList(input + ".foo", input + ".bar")); + Instant.now(), Arrays.asList(element + ".foo", element + ".bar")); } }) .withPollInterval(Duration.ZERO)); @@ -99,6 +102,36 @@ public class WatchTest implements Serializable { @Test @Category({NeedsRunner.class, UsesSplittableParDo.class}) + public void testSinglePollMultipleInputsWithSideInput() { + final PCollectionView<String> moo = + p.apply("moo", Create.of("moo")).apply("moo singleton", View.<String>asSingleton()); + final PCollectionView<String> zoo = + p.apply("zoo", Create.of("zoo")).apply("zoo singleton", View.<String>asSingleton()); + PCollection<KV<String, String>> res = + p.apply("input", Create.of("a", "b")) + .apply( + Watch.growthOf( + new PollFn<String, String>() { + @Override + public PollResult<String> apply(String element, Context c) + throws Exception { + return PollResult.complete( + Instant.now(), + Arrays.asList( + element + " " + c.sideInput(moo) + " " + c.sideInput(zoo))); + } + }, + requiresSideInputs(moo, zoo)) + .withPollInterval(Duration.ZERO)); + + PAssert.that(res) + .containsInAnyOrder(Arrays.asList(KV.of("a", "a moo zoo"), KV.of("b", "b moo zoo"))); + + p.run(); + } + + @Test + @Category({NeedsRunner.class, UsesSplittableParDo.class}) public void testMultiplePollsWithTerminationBecauseOutputIsFinal() { testMultiplePolls(false); } @@ -178,13 +211,14 @@ public class WatchTest implements Serializable { Watch.growthOf( new PollFn<String, KV<String, Integer>>() { @Override - public PollResult<KV<String, Integer>> apply(String input, Instant time) { + public PollResult<KV<String, Integer>> apply(String element, Context c) + throws Exception { String pollId = UUID.randomUUID().toString(); List<KV<String, Integer>> output = Lists.newArrayList(); for (int i = 0; i < numResults; ++i) { output.add(KV.of(pollId, i)); } - return PollResult.complete(time, output); + return PollResult.complete(Instant.now(), output); } }) .withTerminationPerInput(Watch.Growth.<String>afterTotalOf(standardSeconds(1))) @@ -291,7 +325,7 @@ public class WatchTest implements Serializable { * Gradually emits all items from the given list, pairing each one with a UUID that identifies the * round of polling, so a client can check how many rounds of polling there were. */ - private static class TimedPollFn<InputT, OutputT> implements PollFn<InputT, OutputT> { + private static class TimedPollFn<InputT, OutputT> extends PollFn<InputT, OutputT> { private final Instant baseTime; private final List<OutputT> outputs; private final Duration timeToOutputEverything; @@ -311,7 +345,7 @@ public class WatchTest implements Serializable { } @Override - public PollResult<OutputT> apply(InputT input, Instant time) { + public PollResult<OutputT> apply(InputT element, Context c) throws Exception { Instant now = Instant.now(); Duration elapsed = new Duration(baseTime, Instant.now()); if (elapsed.isLongerThan(timeToFail)) { http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java index a4f58da..645da5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java @@ -25,10 +25,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import java.util.List; import java.util.Set; +import org.hamcrest.CoreMatchers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -115,8 +117,17 @@ public class TypeDescriptorsTest { @Test public void testTypeDescriptorsTypeParameterOfErased() throws Exception { Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric(); - assertNull(extractFooT(instance)); + + TypeDescriptor<Integer> fooT = extractFooT(instance); + assertNotNull(fooT); + // Using toString() assertions because verifying the contents of a Type is very cumbersome, + // and the expected types can not be easily constructed directly. + assertEquals("ActualFooT", fooT.toString()); + assertEquals(strings(), extractBarT(instance)); - assertNull(extractKV(instance)); + + TypeDescriptor<KV<Integer, String>> kvT = extractKV(instance); + assertNotNull(kvT); + assertThat(kvT.toString(), CoreMatchers.containsString("KV<ActualFooT, java.lang.String>")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2771687..2f99643 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -95,7 +95,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; @@ -547,15 +546,12 @@ public class BigQueryIO { return getCoder(); } - TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(getParseFn()); - - String message = - "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder()."; - checkArgument(descriptor != null, message); try { - return coderRegistry.getCoder(descriptor); + return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn())); } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException(message, e); + throw new IllegalArgumentException( + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().", + e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index ea4fc4e..ecfc990 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -164,10 +164,6 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab DynamicDestinations.class, new TypeDescriptors.TypeVariableExtractor< DynamicDestinations<T, DestinationT>, DestinationT>() {}); - checkArgument( - descriptor != null, - "Unable to infer a coder for DestinationT, " - + "please specify it explicitly by overriding getDestinationCoder()"); return registry.getCoder(descriptor); } }
