http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java deleted file mode 100644 index d057d81..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.transforms; - -/** Useful {@link SerializableFunction} overrides. */ -public class SerializableFunctions { - private static class Identity<T> implements SerializableFunction<T, T> { - @Override - public T apply(T input) { - return input; - } - } - - private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> { - OutT value; - - Constant(OutT value) { - this.value = value; - } - - @Override - public OutT apply(InT input) { - return value; - } - } - - public static <T> SerializableFunction<T, T> identity() { - return new Identity<>(); - } - - public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) { - return new Constant<>(value); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 331b143..073c750 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -257,10 +257,8 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<List<T>> view = - PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<T, List<T>>of(view)); - return view; + return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView( + input, input.getWindowingStrategy(), input.getCoder()))); } } @@ -284,10 +282,8 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<T, Iterable<T>>of(view)); - return view; + return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView( + input, input.getWindowingStrategy(), input.getCoder()))); } } @@ -427,10 +423,11 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - return view; + return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of( + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()))); } } @@ -462,10 +459,11 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view)); - return view; + return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of( + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()))); } } @@ -482,7 +480,7 @@ public class View { */ @Internal public static class CreatePCollectionView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { private PCollectionView<ViewT> view; private CreatePCollectionView(PCollectionView<ViewT> view) { @@ -508,10 +506,8 @@ public class View { } @Override - public PCollection<ElemT> expand(PCollection<ElemT> input) { - return PCollection.<ElemT>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + return view; } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index cf96c9b..5d5887a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -89,7 +89,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext"; public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext"; public static final String WINDOW_PARAMETER_METHOD = "window"; - public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions"; public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker"; public static final String STATE_PARAMETER_METHOD = "state"; public static final String TIMER_PARAMETER_METHOD = "timer"; @@ -627,11 +626,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)), TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class))); } - - @Override - public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) { - return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD); - } }); } @@ -640,17 +634,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { * {@link ProcessElement} method. */ private static final class ProcessElementDelegation extends DoFnMethodDelegation { - private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD; - - static { - try { - PROCESS_CONTINUATION_STOP_METHOD = - new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop")); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to locate ProcessContinuation.stop()"); - } - } - private final DoFnSignature.ProcessElementMethod signature; /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */ @@ -684,16 +667,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } return new StackManipulation.Compound(pushParameters); } - - @Override - protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { - if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) { - return new StackManipulation.Compound( - MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE); - } else { - return MethodReturn.of(targetMethod.getReturnType().asErasure()); - } - } } private static class UserCodeMethodInvocation implements StackManipulation { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java index 5e31f2e..e031337 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms.reflect; - import com.google.common.base.CharMatcher; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -62,14 +61,13 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { @SuppressWarnings("unchecked") Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass(); - try { - OnTimerMethodSpecifier onTimerMethodSpecifier = - OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId); - Constructor<?> constructor = constructorCache.get(onTimerMethodSpecifier); - OnTimerInvoker<InputT, OutputT> invoker = + try { + Constructor<?> constructor = constructorCache.get(fnClass).get(timerId); + @SuppressWarnings("unchecked") + OnTimerInvoker<InputT, OutputT> invoker = (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn); - return invoker; + return invoker; } catch (InstantiationException | IllegalAccessException | IllegalArgumentException @@ -99,31 +97,50 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { private static final String FN_DELEGATE_FIELD_NAME = "delegate"; /** - * A cache of constructors of generated {@link OnTimerInvoker} classes, - * keyed by {@link OnTimerMethodSpecifier}. + * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn} + * class and then by {@link TimerId}. * * <p>Needed because generating an invoker class is expensive, and to avoid generating an * excessive number of classes consuming PermGen memory in Java's that still have PermGen. */ - private final LoadingCache<OnTimerMethodSpecifier, Constructor<?>> constructorCache = - CacheBuilder.newBuilder().build( - new CacheLoader<OnTimerMethodSpecifier, Constructor<?>>() { - @Override - public Constructor<?> load(final OnTimerMethodSpecifier onTimerMethodSpecifier) - throws Exception { - DoFnSignature signature = - DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass()); - Class<? extends OnTimerInvoker<?, ?>> invokerClass = - generateOnTimerInvokerClass(signature, onTimerMethodSpecifier.timerId()); - try { - return invokerClass.getConstructor(signature.fnClass()); - } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { - throw new RuntimeException(e); - } - - } - }); - /** + private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>> + constructorCache = + CacheBuilder.newBuilder() + .build( + new CacheLoader< + Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() { + @Override + public LoadingCache<String, Constructor<?>> load( + final Class<? extends DoFn<?, ?>> fnClass) throws Exception { + return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass)); + } + }); + + /** + * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the + * invokers for its {@link OnTimer @OnTimer} methods. + */ + private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> { + + private final DoFnSignature signature; + + public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) { + this.signature = DoFnSignatures.getSignature(clazz); + } + + @Override + public Constructor<?> load(String timerId) throws Exception { + Class<? extends OnTimerInvoker<?, ?>> invokerClass = + generateOnTimerInvokerClass(signature, timerId); + try { + return invokerClass.getConstructor(signature.fnClass()); + } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + } + } + + /** * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link * TimerId}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 8b41fee..6fd4052 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; @@ -54,10 +53,8 @@ public interface DoFnInvoker<InputT, OutputT> { * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}. * * @param extra Factory for producing extra parameter objects (such as window), if necessary. - * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link - * DoFn.ProcessContinuation#stop()} if it returns {@code void}. */ - DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); + void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */ void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments); @@ -103,12 +100,7 @@ public interface DoFnInvoker<InputT, OutputT> { */ BoundedWindow window(); - /** Provide {@link PipelineOptions}. */ - PipelineOptions pipelineOptions(); - - /** - * Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. - */ + /** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */ DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn); /** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */ @@ -146,11 +138,6 @@ public interface DoFnInvoker<InputT, OutputT> { } @Override - public PipelineOptions pipelineOptions() { - return null; - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index bfad69e..0b4bf90 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -27,13 +27,11 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; @@ -195,8 +193,6 @@ public abstract class DoFnSignature { return cases.dispatch((StateParameter) this); } else if (this instanceof TimerParameter) { return cases.dispatch((TimerParameter) this); - } else if (this instanceof PipelineOptionsParameter) { - return cases.dispatch((PipelineOptionsParameter) this); } else { throw new IllegalStateException( String.format("Attempt to case match on unknown %s subclass %s", @@ -216,7 +212,6 @@ public abstract class DoFnSignature { ResultT dispatch(RestrictionTrackerParameter p); ResultT dispatch(StateParameter p); ResultT dispatch(TimerParameter p); - ResultT dispatch(PipelineOptionsParameter p); /** * A base class for a visitor with a default method for cases it is not interested in. @@ -264,11 +259,6 @@ public abstract class DoFnSignature { public ResultT dispatch(TimerParameter p) { return dispatchDefault(p); } - - @Override - public ResultT dispatch(PipelineOptionsParameter p) { - return dispatchDefault(p); - } } } @@ -297,11 +287,6 @@ public abstract class DoFnSignature { return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT); } - /** Returns a {@link PipelineOptionsParameter}. */ - public static PipelineOptionsParameter pipelineOptions() { - return new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter(); - } - /** * Returns a {@link RestrictionTrackerParameter}. */ @@ -321,14 +306,6 @@ public abstract class DoFnSignature { } /** - * Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}. - */ - @AutoValue - public abstract static class PipelineOptionsParameter extends Parameter { - PipelineOptionsParameter() {} - } - - /** * Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}. * * <p>All such descriptors are equal. @@ -337,7 +314,6 @@ public abstract class DoFnSignature { public abstract static class StartBundleContextParameter extends Parameter { StartBundleContextParameter() {} } - /** * Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}. * @@ -434,21 +410,16 @@ public abstract class DoFnSignature { @Nullable public abstract TypeDescriptor<? extends BoundedWindow> windowT(); - /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */ - public abstract boolean hasReturnValue(); - static ProcessElementMethod create( Method targetMethod, List<Parameter> extraParameters, TypeDescriptor<?> trackerT, - @Nullable TypeDescriptor<? extends BoundedWindow> windowT, - boolean hasReturnValue) { + @Nullable TypeDescriptor<? extends BoundedWindow> windowT) { return new AutoValue_DoFnSignature_ProcessElementMethod( targetMethod, Collections.unmodifiableList(extraParameters), trackerT, - windowT, - hasReturnValue); + windowT); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index de57c3b..bb191b1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static com.google.common.base.Preconditions.checkState; - import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -44,7 +42,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.Timer; @@ -81,23 +78,19 @@ public class DoFnSignatures { ImmutableList.of( Parameter.ProcessContextParameter.class, Parameter.WindowParameter.class, - Parameter.PipelineOptionsParameter.class, Parameter.TimerParameter.class, Parameter.StateParameter.class); private static final Collection<Class<? extends Parameter>> ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS = ImmutableList.of( - Parameter.PipelineOptionsParameter.class, - Parameter.ProcessContextParameter.class, - Parameter.RestrictionTrackerParameter.class); + Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class); private static final Collection<Class<? extends Parameter>> ALLOWED_ON_TIMER_PARAMETERS = ImmutableList.of( Parameter.OnTimerContextParameter.class, Parameter.WindowParameter.class, - Parameter.PipelineOptionsParameter.class, Parameter.TimerParameter.class, Parameter.StateParameter.class); @@ -194,15 +187,6 @@ public class DoFnSignatures { extraParameters, Predicates.instanceOf(WindowParameter.class)); } - /** - * Indicates whether a {@link Parameter.PipelineOptionsParameter} is - * known in this context. - */ - public boolean hasPipelineOptionsParamter() { - return Iterables.any( - extraParameters, Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)); - } - /** The window type, if any, used by this method. */ @Nullable public TypeDescriptor<? extends BoundedWindow> getWindowType() { @@ -442,8 +426,6 @@ public class DoFnSignatures { * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link * DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of * these must be specified. - * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is - * unbounded. Otherwise (if it returns {@code void}), assume it is bounded. * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated * {@link DoFn.UnboundedPerElement}, this is an error. * </ol> @@ -469,10 +451,7 @@ public class DoFnSignatures { } if (processElement.isSplittable()) { if (isBounded == null) { - isBounded = - processElement.hasReturnValue() - ? PCollection.IsBounded.UNBOUNDED - : PCollection.IsBounded.BOUNDED; + isBounded = PCollection.IsBounded.BOUNDED; } } else { errors.checkArgument( @@ -481,7 +460,6 @@ public class DoFnSignatures { + ((isBounded == PCollection.IsBounded.BOUNDED) ? DoFn.BoundedPerElement.class.getSimpleName() : DoFn.UnboundedPerElement.class.getSimpleName())); - checkState(!processElement.hasReturnValue(), "Should have been inferred splittable"); isBounded = PCollection.IsBounded.BOUNDED; } return isBounded; @@ -718,10 +696,8 @@ public class DoFnSignatures { TypeDescriptor<?> outputT, FnAnalysisContext fnContext) { errors.checkArgument( - void.class.equals(m.getReturnType()) - || DoFn.ProcessContinuation.class.equals(m.getReturnType()), - "Must return void or %s", - DoFn.ProcessContinuation.class.getSimpleName()); + void.class.equals(m.getReturnType()), + "Must return void"); MethodAnalysisContext methodContext = MethodAnalysisContext.create(); @@ -761,11 +737,7 @@ public class DoFnSignatures { } return DoFnSignature.ProcessElementMethod.create( - m, - methodContext.getExtraParameters(), - trackerT, - windowT, - DoFn.ProcessContinuation.class.equals(m.getReturnType())); + m, methodContext.getExtraParameters(), trackerT, windowT); } private static void checkParameterOneOf( @@ -817,12 +789,6 @@ public class DoFnSignatures { "Multiple %s parameters", BoundedWindow.class.getSimpleName()); return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT); - } else if (PipelineOptions.class.equals(rawType)) { - methodErrors.checkArgument( - !methodContext.hasPipelineOptionsParamter(), - "Multiple %s parameters", - PipelineOptions.class.getSimpleName()); - return Parameter.pipelineOptions(); } else if (RestrictionTracker.class.isAssignableFrom(rawType)) { methodErrors.checkArgument( !methodContext.hasRestrictionTrackerParameter(), http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java deleted file mode 100644 index edf7e3c..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.reflect; - -import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * Used by {@link ByteBuddyOnTimerInvokerFactory} to Dynamically generate - * {@link OnTimerInvoker} instances for invoking a particular - * {@link DoFn.TimerId} on a particular {@link DoFn}. - */ - -@AutoValue -abstract class OnTimerMethodSpecifier { - public abstract Class<? extends DoFn<?, ?>> fnClass(); - public abstract String timerId(); - public static OnTimerMethodSpecifier - forClassAndTimerId(Class<? extends DoFn<?, ?>> fnClass, String timerId){ - return new AutoValue_OnTimerMethodSpecifier(fnClass, timerId); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java new file mode 100644 index 0000000..104f5f2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; + +/** A restriction represented by a range of integers [from, to). */ +public class OffsetRange + implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> { + private final long from; + private final long to; + + public OffsetRange(long from, long to) { + checkArgument(from <= to, "Malformed range [%s, %s)", from, to); + this.from = from; + this.to = to; + } + + public long getFrom() { + return from; + } + + public long getTo() { + return to; + } + + @Override + public OffsetRangeTracker newTracker() { + return new OffsetRangeTracker(this); + } + + @Override + public String toString() { + return "[" + from + ", " + to + ')'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + OffsetRange that = (OffsetRange) o; + + if (from != that.from) { + return false; + } + return to == that.to; + } + + @Override + public int hashCode() { + int result = (int) (from ^ (from >>> 32)); + result = 31 * result + (int) (to ^ (to >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java index 4987409..0271a0d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java @@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.MoreObjects; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; /** @@ -101,13 +99,4 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> { lastAttemptedOffset + 1, range.getTo()); } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("range", range) - .add("lastClaimedOffset", lastClaimedOffset) - .add("lastAttemptedOffset", lastAttemptedOffset) - .toString(); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java index 8cb0a6b..27ef68f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java @@ -31,13 +31,10 @@ public interface RestrictionTracker<RestrictionT> { RestrictionT currentRestriction(); /** - * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible: - * after this method returns, the tracker MUST refuse all future claim calls, and {@link - * #checkDone} MUST succeed. - * - * <p>Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the - * work: the old value of {@link #currentRestriction} is equivalent to the new value and the - * return value of this method combined. Must be called at most once on a given object. + * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible. + * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work: + * the old value of {@link #currentRestriction} is equivalent to the new value and the return + * value of this method combined. Must be called at most once on a given object. */ RestrictionT checkpoint(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index c68c497..d48d26b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -79,11 +79,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { } @Override - public boolean assignsToOneWindow() { - return true; - } - - @Override public boolean equals(Object other) { return other instanceof GlobalWindows; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java index 341ba27..40ee68a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java @@ -58,9 +58,4 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow> public Instant getOutputTime(Instant inputTimestamp, W window) { return inputTimestamp; } - - @Override - public final boolean assignsToOneWindow() { - return true; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index 150b956..f657884 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -148,11 +148,6 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> { } @Override - public boolean assignsToOneWindow() { - return !this.period.isShorterThan(this.size); - } - - @Override public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { if (!this.isCompatible(other)) { throw new IncompatibleWindowException( http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index a12be6d..105ebfb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -163,24 +163,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** - * Specifies the conditions under which an on-time pane will be created when a window is closed. - */ - public enum OnTimeBehavior { - /** - * Always fire the on-time pane. Even if there is no new data since the previous firing, - * an element will be produced. - * - * <p>This is the default behavior. - */ - FIRE_ALWAYS, - /** - * Only fire the on-time pane if there is new data since the previous firing. - */ - FIRE_IF_NON_EMPTY - - } - - /** * Creates a {@code Window} {@code PTransform} that uses the given * {@link WindowFn} to window the data. * @@ -213,7 +195,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T @Nullable abstract AccumulationMode getAccumulationMode(); @Nullable abstract Duration getAllowedLateness(); @Nullable abstract ClosingBehavior getClosingBehavior(); - @Nullable abstract OnTimeBehavior getOnTimeBehavior(); @Nullable abstract TimestampCombiner getTimestampCombiner(); abstract Builder<T> toBuilder(); @@ -225,7 +206,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T abstract Builder<T> setAccumulationMode(AccumulationMode mode); abstract Builder<T> setAllowedLateness(Duration allowedLateness); abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); - abstract Builder<T> setOnTimeBehavior(OnTimeBehavior onTimeBehavior); abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner); abstract Window<T> build(); @@ -319,15 +299,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T } /** - * <b><i>(Experimental)</i></b> Override the default {@link OnTimeBehavior}, to control - * whether to output an empty on-time pane. - */ - @Experimental(Kind.TRIGGER) - public Window<T> withOnTimeBehavior(OnTimeBehavior behavior) { - return toBuilder().setOnTimeBehavior(behavior).build(); - } - - /** * Get the output strategy of this {@link Window Window PTransform}. For internal use * only. */ @@ -350,9 +321,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T if (getClosingBehavior() != null) { result = result.withClosingBehavior(getClosingBehavior()); } - if (getOnTimeBehavior() != null) { - result = result.withOnTimeBehavior(getOnTimeBehavior()); - } if (getTimestampCombiner() != null) { result = result.withTimestampCombiner(getTimestampCombiner()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index ffe85f3..001d630 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -180,17 +180,6 @@ public abstract class WindowFn<T, W extends BoundedWindow> } /** - * Returns true if this {@link WindowFn} always assigns an element to exactly one window. - * - * <p>If this varies per-element, or cannot be determined, conservatively return false. - * - * <p>By default, returns false. - */ - public boolean assignsToOneWindow() { - return false; - } - - /** * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of * this {@link WindowFn} instance's most-derived class. * http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index ef6d833..a4bfdda 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -116,9 +116,4 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { return inputTimestamp; } - - @Override - public boolean assignsToOneWindow() { - return true; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index 4063d11..f210fd8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -20,8 +20,6 @@ package org.apache.beam.sdk.values; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import java.util.Collections; -import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Internal; @@ -228,11 +226,6 @@ public class PCollection<T> extends PValueBase implements PValue { return super.getName(); } - @Override - public final Map<TupleTag<?>, PValue> expand() { - return Collections.<TupleTag<?>, PValue>singletonMap(tag, this); - } - /** * Sets the name of this {@link PCollection}. Returns {@code this}. * @@ -321,11 +314,6 @@ public class PCollection<T> extends PValueBase implements PValue { private IsBounded isBounded; - /** - * A local {@link TupleTag} used in the expansion of this {@link PValueBase}. - */ - private final TupleTag<?> tag = new TupleTag<>(); - private PCollection(Pipeline p) { super(p); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java index e17e146..74887c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import java.io.IOException; @@ -39,7 +38,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; @@ -141,18 +139,6 @@ public class PCollectionViews { } /** - * Expands a list of {@link PCollectionView} into the form needed for - * {@link PTransform#getAdditionalInputs()}. - */ - public static Map<TupleTag<?>, PValue> toAdditionalInputs(Iterable<PCollectionView<?>> views) { - ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder(); - for (PCollectionView<?> view : views) { - additionalInputs.put(view.getTagInternal(), view.getPCollection()); - } - return additionalInputs.build(); - } - - /** * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}. * * <p>For internal use only. @@ -184,15 +170,6 @@ public class PCollectionViews { } /** - * Returns if a default value was specified. - */ - @Deprecated - @Internal - public boolean hasDefault() { - return hasDefault; - } - - /** * Returns the default value that was specified. * * <p>For internal use only. @@ -296,16 +273,6 @@ public class PCollectionViews { } })); } - - @Override - public boolean equals(Object other) { - return other instanceof ListViewFn; - } - - @Override - public int hashCode() { - return ListViewFn.class.hashCode(); - } } /** @@ -524,10 +491,5 @@ public class PCollectionViews { public String toString() { return MoreObjects.toStringHelper(this).add("tag", tag).toString(); } - - @Override - public Map<TupleTag<?>, PValue> expand() { - return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection); - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index f312eac..6f638d7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.values; import static com.google.common.base.Preconditions.checkState; +import java.util.Collections; +import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; @@ -85,6 +87,11 @@ public abstract class PValueBase implements PValue { private String name; /** + * A local {@link TupleTag} used in the expansion of this {@link PValueBase}. + */ + private TupleTag<?> tag = new TupleTag<>(); + + /** * Whether this {@link PValueBase} has been finalized, and its core * properties, e.g., name, can no longer be changed. */ @@ -101,6 +108,11 @@ public abstract class PValueBase implements PValue { } @Override + public final Map<TupleTag<?>, PValue> expand() { + return Collections.<TupleTag<?>, PValue>singletonMap(tag, this); + } + + @Override public void finishSpecifying(PInput input, PTransform<?, ?> transform) { finishedSpecifying = true; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java deleted file mode 100644 index e56af13..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.values; - -import java.io.Serializable; -import java.util.Objects; - -/** A key and a shard number. */ -public class ShardedKey<K> implements Serializable { - private static final long serialVersionUID = 1L; - private final K key; - private final int shardNumber; - - public static <K> ShardedKey<K> of(K key, int shardNumber) { - return new ShardedKey<>(key, shardNumber); - } - - private ShardedKey(K key, int shardNumber) { - this.key = key; - this.shardNumber = shardNumber; - } - - public K getKey() { - return key; - } - - public int getShardNumber() { - return shardNumber; - } - - @Override - public String toString() { - return "key: " + key + " shard: " + shardNumber; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ShardedKey)) { - return false; - } - ShardedKey<K> other = (ShardedKey<K>) o; - return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber); - } - - @Override - public int hashCode() { - return Objects.hash(key, shardNumber); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java index 3b74e69..8a773e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Duration; @@ -60,7 +59,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab private final AccumulationMode mode; private final Duration allowedLateness; private final ClosingBehavior closingBehavior; - private final OnTimeBehavior onTimeBehavior; private final TimestampCombiner timestampCombiner; private final boolean triggerSpecified; private final boolean modeSpecified; @@ -73,8 +71,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab AccumulationMode mode, boolean modeSpecified, Duration allowedLateness, boolean allowedLatenessSpecified, TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, - ClosingBehavior closingBehavior, - OnTimeBehavior onTimeBehavior) { + ClosingBehavior closingBehavior) { this.windowFn = windowFn; this.trigger = trigger; this.triggerSpecified = triggerSpecified; @@ -83,7 +80,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab this.allowedLateness = allowedLateness; this.allowedLatenessSpecified = allowedLatenessSpecified; this.closingBehavior = closingBehavior; - this.onTimeBehavior = onTimeBehavior; this.timestampCombiner = timestampCombiner; this.timestampCombinerSpecified = timestampCombinerSpecified; } @@ -102,8 +98,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, TimestampCombiner.END_OF_WINDOW, false, - ClosingBehavior.FIRE_IF_NON_EMPTY, - OnTimeBehavior.FIRE_ALWAYS); + ClosingBehavior.FIRE_IF_NON_EMPTY); } public WindowFn<T, W> getWindowFn() { @@ -138,10 +133,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return closingBehavior; } - public OnTimeBehavior getOnTimeBehavior() { - return onTimeBehavior; - } - public TimestampCombiner getTimestampCombiner() { return timestampCombiner; } @@ -161,8 +152,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); + closingBehavior); } /** @@ -176,8 +166,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, true, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); + closingBehavior); } /** @@ -194,8 +183,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); + closingBehavior); } /** @@ -209,8 +197,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, true, timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); + closingBehavior); } public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) { @@ -220,19 +207,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); - } - - public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) { - return new WindowingStrategy<T, W>( - windowFn, - trigger, triggerSpecified, - mode, modeSpecified, - allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, - closingBehavior, - onTimeBehavior); + closingBehavior); } @Experimental(Experimental.Kind.OUTPUT_TIME) @@ -244,8 +219,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, modeSpecified, allowedLateness, allowedLatenessSpecified, timestampCombiner, true, - closingBehavior, - onTimeBehavior); + closingBehavior); } @Override @@ -272,7 +246,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab && getMode().equals(other.getMode()) && getAllowedLateness().equals(other.getAllowedLateness()) && getClosingBehavior().equals(other.getClosingBehavior()) - && getOnTimeBehavior().equals(other.getOnTimeBehavior()) && getTrigger().equals(other.getTrigger()) && getTimestampCombiner().equals(other.getTimestampCombiner()) && getWindowFn().equals(other.getWindowFn()); @@ -305,7 +278,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab mode, true, allowedLateness, true, timestampCombiner, true, - closingBehavior, - onTimeBehavior); + closingBehavior); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 260e47a..6d01d32 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -54,11 +54,10 @@ import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -277,42 +276,37 @@ public class AvroIOTest { } private static class WindowedFilenamePolicy extends FilenamePolicy { - final ResourceId outputFilePrefix; + final String outputFilePrefix; - WindowedFilenamePolicy(ResourceId outputFilePrefix) { + WindowedFilenamePolicy(String outputFilePrefix) { this.outputFilePrefix = outputFilePrefix; } @Override - public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) { - String filenamePrefix = - outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), ""); - - String filename = - String.format( - "%s-%s-%s-of-%s-pane-%s%s%s", - filenamePrefix, - input.getWindow(), - input.getShardNumber(), - input.getNumShards() - 1, - input.getPaneInfo().getIndex(), - input.getPaneInfo().isLast() ? "-final" : "", - outputFileHints.getSuggestedFilenameSuffix()); - return outputFilePrefix - .getCurrentDirectory() - .resolve(filename, StandardResolveOptions.RESOLVE_FILE); + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext input, String extension) { + String filename = String.format( + "%s-%s-%s-of-%s-pane-%s%s%s", + outputFilePrefix, + input.getWindow(), + input.getShardNumber(), + input.getNumShards() - 1, + input.getPaneInfo().getIndex(), + input.getPaneInfo().isLast() ? "-final" : "", + extension); + return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override - public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename( + ResourceId outputDirectory, Context input, String extension) { throw new UnsupportedOperationException("Expecting windowed outputs only"); } @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.add( - DisplayData.item("fileNamePrefix", outputFilePrefix.toString()) - .withLabel("File Name Prefix")); + builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) + .withLabel("File Name Prefix")); } } @@ -365,18 +359,15 @@ public class AvroIOTest { Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length)) .advanceWatermarkToInfinity(); - FilenamePolicy policy = - new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename)); + FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename); windowedAvroWritePipeline .apply(values) .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply( - AvroIO.write(GenericClass.class) - .to(policy) - .withTempDirectory( - StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true))) - .withWindowedWrites() - .withNumShards(2)); + .apply(AvroIO.write(GenericClass.class) + .to(baseFilename) + .withFilenamePolicy(policy) + .withWindowedWrites() + .withNumShards(2)); windowedAvroWritePipeline.run(); // Validate that the data written matches the expected elements in the expected order @@ -503,14 +494,13 @@ public class AvroIOTest { expectedFiles.add( new File( DefaultFilenamePolicy.constructName( - FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix), - shardNameTemplate, - "" /* no suffix */, - i, - numShards, - null, - null) - .toString())); + outputFilePrefix, + shardNameTemplate, + "" /* no suffix */, + i, + numShards, + null, + null))); } List<String> actualElements = new ArrayList<>(); @@ -582,4 +572,15 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString())); } + + @Test + public void testWindowedWriteRequiresFilenamePolicy() { + PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of())); + AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage( + "When using windowed writes, a filename policy must be set via withFilenamePolicy()"); + emptyInput.apply(write); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java index 9dc6d33..217420c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName; import static org.junit.Assert.assertEquals; -import org.apache.beam.sdk.io.fs.ResourceId; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -30,108 +30,69 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class DefaultFilenamePolicyTest { - private static String constructName( - String baseFilename, - String shardTemplate, - String suffix, - int shardNum, - int numShards, - String paneStr, - String windowStr) { - ResourceId constructed = - DefaultFilenamePolicy.constructName( - FileSystems.matchNewResource(baseFilename, false), - shardTemplate, - suffix, - shardNum, - numShards, - paneStr, - windowStr); - return constructed.toString(); - } - @Test public void testConstructName() { - assertEquals( - "/path/to/output-001-of-123.txt", - constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); - assertEquals( - "/path/to/out.txt/part-00042", - constructName("/path/to/out.txt", "/part-SSSSS", "", 42, 100, null, null)); + assertEquals("out.txt/part-00042", + constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null)); - assertEquals("/path/to/out.txt", constructName("/path/to/ou", "t.t", "xt", 1, 1, null, null)); + assertEquals("out.txt", + constructName("ou", "t.t", "xt", 1, 1, null, null)); - assertEquals( - "/path/to/out0102shard.txt", - constructName("/path/to/out", "SSNNshard", ".txt", 1, 2, null, null)); + assertEquals("out0102shard.txt", + constructName("out", "SSNNshard", ".txt", 1, 2, null, null)); - assertEquals( - "/path/to/out-2/1.part-1-of-2.txt", - constructName("/path/to/out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null)); + assertEquals("out-2/1.part-1-of-2.txt", + constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null)); } @Test public void testConstructNameWithLargeShardCount() { - assertEquals( - "/out-100-of-5000.txt", constructName("/out", "-SS-of-NN", ".txt", 100, 5000, null, null)); + assertEquals("out-100-of-5000.txt", + constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null)); } @Test public void testConstructWindowedName() { - assertEquals( - "/path/to/output-001-of-123.txt", - constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); - - assertEquals( - "/path/to/output-001-of-123-PPP-W.txt", - constructName("/path/to/output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); - - assertEquals( - "/path/to/out" + ".txt/part-00042-myPaneStr-myWindowStr", - constructName( - "/path/to/out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr")); - - assertEquals( - "/path/to/out.txt", - constructName("/path/to/ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr")); - - assertEquals( - "/path/to/out0102shard-oneMoreWindowStr-anotherPaneStr.txt", - constructName( - "/path/to/out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr")); - - assertEquals( - "/out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" - + "panemyPaneStr3.txt", - constructName( - "/out", - "-N/S.part-S-of-N-W-P-windowW-paneP", - ".txt", - 1, - 2, - "myPaneStr3", - "slidingWindow1")); + assertEquals("output-001-of-123.txt", + constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null)); + + assertEquals("output-001-of-123-PPP-W.txt", + constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null)); + + assertEquals("out.txt/part-00042-myPaneStr-myWindowStr", + constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", + "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2", + "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt", + constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", + "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-" + + "panemyPaneStr3.txt", + constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3", + "slidingWindow1")); // test first/last pane - assertEquals( - "/out.txt/part-00042-myWindowStr-pane-11-true-false", - constructName( - "/out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", "myWindowStr")); - - assertEquals( - "/path/to/out.txt", - constructName("/path/to/ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr")); - - assertEquals( - "/out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", - constructName( - "/out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr")); - - assertEquals( - "/path/to/out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", - constructName( - "/path/to/out", "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); + assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false", + constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", + "myWindowStr")); + + assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane", + "anotherWindowStr")); + + assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt", + constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", + "oneMoreWindowStr")); + + assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt", + constructName("out", + "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java index a7644b6..6615a2e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java @@ -39,7 +39,7 @@ public class DrunkWritableByteChannelFactory implements WritableByteChannelFacto } @Override - public String getSuggestedFilenameSuffix() { + public String getFilenameSuffix() { return ".drunk"; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java index 755bb59..caad759 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java @@ -103,7 +103,7 @@ public class FileBasedSinkTest { SimpleSink.SimpleWriter writer = buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter(); - writer.openUnwindowed(testUid, -1, null); + writer.openUnwindowed(testUid, -1); for (String value : values) { writer.write(value); } @@ -198,27 +198,23 @@ public class FileBasedSinkTest { throws Exception { int numFiles = temporaryFiles.size(); - List<FileResult<Void>> fileResults = new ArrayList<>(); + List<FileResult> fileResults = new ArrayList<>(); // Create temporary output bundles and output File objects. for (int i = 0; i < numFiles; i++) { fileResults.add( - new FileResult<Void>( + new FileResult( LocalResources.fromFile(temporaryFiles.get(i), false), WriteFiles.UNKNOWN_SHARDNUM, null, - null, null)); } writeOp.finalize(fileResults); + ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); for (int i = 0; i < numFiles; i++) { - ResourceId outputFilename = - writeOp - .getSink() - .getDynamicDestinations() - .getFilenamePolicy(null) - .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED); + ResourceId outputFilename = writeOp.getSink().getFilenamePolicy() + .unwindowedFilename(outputDirectory, new Context(i, numFiles), ""); assertTrue(new File(outputFilename.toString()).exists()); assertFalse(temporaryFiles.get(i).exists()); } @@ -235,12 +231,11 @@ public class FileBasedSinkTest { private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory) throws Exception { String prefix = "file"; - SimpleSink<Void> sink = - SimpleSink.makeSimpleSink( - getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED); + SimpleSink sink = + new SimpleSink(getBaseOutputDirectory(), prefix, "", ""); - WriteOperation<String, Void> writeOp = - new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); + WriteOperation<String> writeOp = + new SimpleSink.SimpleWriteOperation(sink, tempDirectory); List<File> temporaryFiles = new ArrayList<>(); List<File> outputFiles = new ArrayList<>(); @@ -277,6 +272,8 @@ public class FileBasedSinkTest { @Test public void testCopyToOutputFiles() throws Exception { SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(); + ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get(); + List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3"); List<String> inputContents = Arrays.asList("1", "2", "3"); List<String> expectedOutputFilenames = Arrays.asList( @@ -295,14 +292,9 @@ public class FileBasedSinkTest { File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i)); List<String> lines = Collections.singletonList(inputContents.get(i)); writeFile(lines, inputTmpFile); - inputFilePaths.put( - LocalResources.fromFile(inputTmpFile, false), - writeOp - .getSink() - .getDynamicDestinations() - .getFilenamePolicy(null) - .unwindowedFilename( - new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED)); + inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false), + writeOp.getSink().getFilenamePolicy() + .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), "")); } // Copy input files to output files. @@ -319,8 +311,7 @@ public class FileBasedSinkTest { ResourceId outputDirectory, FilenamePolicy policy, int numFiles) { List<ResourceId> filenames = new ArrayList<>(); for (int i = 0; i < numFiles; i++) { - filenames.add( - policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED)); + filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), "")); } return filenames; } @@ -335,10 +326,8 @@ public class FileBasedSinkTest { List<ResourceId> actual; ResourceId root = getBaseOutputDirectory(); - SimpleSink<Void> sink = - SimpleSink.makeSimpleSink( - root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED); - FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); + SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test"); + FilenamePolicy policy = sink.getFilenamePolicy(); expected = Arrays.asList( root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE), @@ -363,9 +352,8 @@ public class FileBasedSinkTest { @Test public void testCollidingOutputFilenames() throws IOException { ResourceId root = getBaseOutputDirectory(); - SimpleSink<Void> sink = - SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED); - SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink); + SimpleSink sink = new SimpleSink(root, "file", "-NN", "test"); + SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink); ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE); ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE); @@ -373,11 +361,11 @@ public class FileBasedSinkTest { ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE); // More than one shard does. try { - Iterable<FileResult<Void>> results = + Iterable<FileResult> results = Lists.newArrayList( - new FileResult<Void>(temp1, 1, null, null, null), - new FileResult<Void>(temp2, 1, null, null, null), - new FileResult<Void>(temp3, 1, null, null, null)); + new FileResult(temp1, 1, null, null), + new FileResult(temp2, 1, null, null), + new FileResult(temp3, 1, null, null)); writeOp.buildOutputFilenames(results); fail("Should have failed."); } catch (IllegalStateException exn) { @@ -391,10 +379,8 @@ public class FileBasedSinkTest { List<ResourceId> expected; List<ResourceId> actual; ResourceId root = getBaseOutputDirectory(); - SimpleSink<Void> sink = - SimpleSink.makeSimpleSink( - root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED); - FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null); + SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", ""); + FilenamePolicy policy = sink.getFilenamePolicy(); expected = Arrays.asList( root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE), @@ -500,11 +486,10 @@ public class FileBasedSinkTest { public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception { final String testUid = "testId"; ResourceId root = getBaseOutputDirectory(); - WriteOperation<String, Void> writeOp = - SimpleSink.makeSimpleSink( - root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) + WriteOperation<String> writeOp = + new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory()) .createWriteOperation(); - final Writer<String, Void> writer = writeOp.createWriter(); + final Writer<String> writer = writeOp.createWriter(); final ResourceId expectedFile = writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE); @@ -518,7 +503,7 @@ public class FileBasedSinkTest { expected.add("footer"); expected.add("footer"); - writer.openUnwindowed(testUid, -1, null); + writer.openUnwindowed(testUid, -1); writer.write("a"); writer.write("b"); final FileResult result = writer.close(); @@ -528,20 +513,20 @@ public class FileBasedSinkTest { } /** Build a SimpleSink with default options. */ - private SimpleSink<Void> buildSink() { - return SimpleSink.makeSimpleSink( - getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED); + private SimpleSink buildSink() { + return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test"); } - /** Build a SimpleWriteOperation with default options and the given temporary directory. */ - private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir( - ResourceId tempDirectory) { - SimpleSink<Void> sink = buildSink(); - return new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory); + /** + * Build a SimpleWriteOperation with default options and the given temporary directory. + */ + private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) { + SimpleSink sink = buildSink(); + return new SimpleSink.SimpleWriteOperation(sink, tempDirectory); } /** Build a write operation with the default options for it and its parent sink. */ - private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() { + private SimpleSink.SimpleWriteOperation buildWriteOperation() { return buildSink().createWriteOperation(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java index 9196178..c97313d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java @@ -19,55 +19,33 @@ package org.apache.beam.sdk.io; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.util.MimeTypes; /** - * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer. + * A simple {@link FileBasedSink} that writes {@link String} values as lines with + * header and footer. */ -class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { - public SimpleSink( - ResourceId tempDirectory, - DynamicDestinations<String, DestinationT> dynamicDestinations, - WritableByteChannelFactory writableByteChannelFactory) { - super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory); +class SimpleSink extends FileBasedSink<String> { + public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) { + this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED); } - public static SimpleSink<Void> makeSimpleSink( - ResourceId tempDirectory, FilenamePolicy filenamePolicy) { - return new SimpleSink<>( - tempDirectory, - DynamicFileDestinations.<String>constant(filenamePolicy), - CompressionType.UNCOMPRESSED); - } - - public static SimpleSink<Void> makeSimpleSink( - ResourceId baseDirectory, - String prefix, - String shardTemplate, - String suffix, - WritableByteChannelFactory writableByteChannelFactory) { - DynamicDestinations<String, Void> dynamicDestinations = - DynamicFileDestinations.constant( - DefaultFilenamePolicy.fromParams( - new Params() - .withBaseFilename( - baseDirectory.resolve(prefix, StandardResolveOptions.RESOLVE_FILE)) - .withShardTemplate(shardTemplate) - .withSuffix(suffix))); - return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory); + public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix, + WritableByteChannelFactory writableByteChannelFactory) { + super( + StaticValueProvider.of(baseOutputDirectory), + new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix), + writableByteChannelFactory); } @Override - public SimpleWriteOperation<DestinationT> createWriteOperation() { - return new SimpleWriteOperation<>(this); + public SimpleWriteOperation createWriteOperation() { + return new SimpleWriteOperation(this); } - static final class SimpleWriteOperation<DestinationT> - extends WriteOperation<String, DestinationT> { + static final class SimpleWriteOperation extends WriteOperation<String> { public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) { super(sink, tempOutputDirectory); } @@ -77,12 +55,12 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> { } @Override - public SimpleWriter<DestinationT> createWriter() throws Exception { - return new SimpleWriter<>(this); + public SimpleWriter createWriter() throws Exception { + return new SimpleWriter(this); } } - static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> { + static final class SimpleWriter extends Writer<String> { static final String HEADER = "header"; static final String FOOTER = "footer";
