post-merge fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f013cb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f013cb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f013cb7 Branch: refs/heads/gearpump-runner Commit: 8f013cb76fb85421da00eb8df0074dac0a8233fa Parents: 9dc9be9 Author: manuzhang <[email protected]> Authored: Wed Oct 26 11:22:56 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Oct 26 14:02:54 2016 +0800 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 586 +++++++++++++++++++ .../beam/runners/direct/DirectRunner.java | 22 - .../beam/runners/direct/KeyedResourcePool.java | 47 -- .../runners/direct/LockedKeyedResourcePool.java | 95 --- .../direct/ParDoSingleEvaluatorFactory.java | 2 +- .../direct/TransformEvaluatorRegistry.java | 37 -- .../direct/LockedKeyedResourcePoolTest.java | 163 ------ .../flink/examples/streaming/package-info.java | 22 + .../apache/beam/runners/flink/package-info.java | 22 - runners/gearpump/pom.xml | 2 +- .../gearpump/GearpumpPipelineResult.java | 6 + .../gearpump/GearpumpPipelineRunner.java | 2 - .../translators/TransformTranslator.java | 30 + .../translators/io/UnboundedSourceWrapper.java | 45 ++ .../translators/utils/GearpumpDoFnRunner.java | 16 +- .../beam/runners/dataflow/util/DoFnInfo.java | 1 - .../runners/spark/translation/DoFnFunction.java | 2 - .../main/java/org/apache/beam/sdk/Pipeline.java | 2 +- .../java/org/apache/beam/sdk/io/TextIO.java | 15 +- .../apache/beam/sdk/runners/PipelineRunner.java | 1 - .../apache/beam/sdk/testing/TestPipeline.java | 2 - .../org/apache/beam/sdk/transforms/DoFn.java | 14 - .../org/apache/beam/sdk/transforms/ParDo.java | 12 - .../beam/sdk/transforms/LatestFnTests.java | 233 -------- .../beam/sdk/io/kinesis/package-info.java | 22 - .../beam/sdk/io/mongodb/package-info.java | 22 - 26 files changed, 703 insertions(+), 720 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java new file mode 100644 index 0000000..dec9905 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.InputProvider; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.ExecutionContext.StepContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; + +/** + * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in. + * + * @param <InputT> the type of the {@link DoFn} (main) input elements + * @param <OutputT> the type of the {@link DoFn} (main) output elements + */ +public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + + /** The {@link DoFn} being run. */ + private final DoFn<InputT, OutputT> fn; + + /** The {@link DoFnInvoker} being run. */ + private final DoFnInvoker<InputT, OutputT> invoker; + + /** The context used for running the {@link DoFn}. */ + private final DoFnContext<InputT, OutputT> context; + + private final OutputManager outputManager; + + private final TupleTag<OutputT> mainOutputTag; + + private final boolean observesWindow; + + public SimpleDoFnRunner( + PipelineOptions options, + DoFn<InputT, OutputT> fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy<?, ?> windowingStrategy) { + this.fn = fn; + this.observesWindow = + DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().observesWindow(); + this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn); + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.context = + new DoFnContext<>( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy == null ? null : windowingStrategy.getWindowFn()); + } + + @Override + public void startBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + invoker.invokeStartBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + @Override + public void processElement(WindowedValue<InputT> compressedElem) { + if (observesWindow) { + for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) { + invokeProcessElement(elem); + } + } else { + invokeProcessElement(compressedElem); + } + } + + private void invokeProcessElement(WindowedValue<InputT> elem) { + final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); + + // Note that if the element must be exploded into all its windows, that has to be done outside + // of this runner. + final DoFn.ExtraContextFactory<InputT, OutputT> extraContextFactory = + createExtraContextFactory(elem); + + // This can contain user code. Wrap it in case it throws an exception. + try { + invoker.invokeProcessElement(processContext, extraContextFactory); + } catch (Exception ex) { + throw wrapUserCodeException(ex); + } + } + + @Override + public void finishBundle() { + // This can contain user code. Wrap it in case it throws an exception. + try { + invoker.invokeFinishBundle(context); + } catch (Throwable t) { + // Exception in user code. + throw wrapUserCodeException(t); + } + } + + /** Returns a new {@link DoFn.ProcessContext} for the given element. */ + private DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) { + return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); + } + + private DoFn.ExtraContextFactory<InputT, OutputT> createExtraContextFactory( + WindowedValue<InputT> elem) { + return new DoFnExtraContextFactory<InputT, OutputT>(elem.getWindows(), elem.getPane()); + } + + private RuntimeException wrapUserCodeException(Throwable t) { + throw UserCodeException.wrapIf(!isSystemDoFn(), t); + } + + private boolean isSystemDoFn() { + return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class); + } + + /** + * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. + * + * @param <InputT> the type of the {@link DoFn} (main) input elements + * @param <OutputT> the type of the {@link DoFn} (main) output elements + */ + private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context { + private static final int MAX_SIDE_OUTPUTS = 1000; + + final PipelineOptions options; + final DoFn<InputT, OutputT> fn; + final SideInputReader sideInputReader; + final OutputManager outputManager; + final TupleTag<OutputT> mainOutputTag; + final StepContext stepContext; + final AggregatorFactory aggregatorFactory; + final WindowFn<?, ?> windowFn; + + /** + * The set of known output tags, some of which may be undeclared, so we can throw an exception + * when it exceeds {@link #MAX_SIDE_OUTPUTS}. + */ + private Set<TupleTag<?>> outputTags; + + public DoFnContext( + PipelineOptions options, + DoFn<InputT, OutputT> fn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowFn<?, ?> windowFn) { + fn.super(); + this.options = options; + this.fn = fn; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.outputTags = Sets.newHashSet(); + + outputTags.add(mainOutputTag); + for (TupleTag<?> sideOutputTag : sideOutputTags) { + outputTags.add(sideOutputTag); + } + + this.stepContext = stepContext; + this.aggregatorFactory = aggregatorFactory; + this.windowFn = windowFn; + super.setupDelegateAggregators(); + } + + ////////////////////////////////////////////////////////////////////////////// + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( + T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + // The windowFn can never succeed at accessing the element, so its type does not + // matter here + @SuppressWarnings("unchecked") + WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; + windows = + objectWindowFn.assignWindows( + objectWindowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public W window() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } + + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + if (!sideInputReader.contains(view)) { + throw new IllegalArgumentException("calling sideInput() with unknown view"); + } + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + return sideInputReader.get(view, sideInputWindow); + } + + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); + } + + void outputWindowedValue(WindowedValue<OutputT> windowedElem) { + outputManager.output(mainOutputTag, windowedElem); + if (stepContext != null) { + stepContext.noteOutput(windowedElem); + } + } + + private <T> void sideOutputWindowedValue( + TupleTag<T> tag, + T output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); + } + + private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { + if (!outputTags.contains(tag)) { + // This tag wasn't declared nor was it seen before during this execution. + // Thus, this must be a new, undeclared and unconsumed output. + // To prevent likely user errors, enforce the limit on the number of side + // outputs. + if (outputTags.size() >= MAX_SIDE_OUTPUTS) { + throw new IllegalArgumentException( + "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); + } + outputTags.add(tag); + } + + outputManager.output(tag, windowedElem); + if (stepContext != null) { + stepContext.noteSideOutput(tag, windowedElem); + } + } + + // Following implementations of output, outputWithTimestamp, and sideOutput + // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by + // ProcessContext's versions in DoFn.processElement. + @Override + public void output(OutputT output) { + outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); + sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); + sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + checkNotNull(combiner, "Combiner passed to createAggregator cannot be null"); + return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); + } + } + + /** + * A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a + * single element. + * + * @param <InputT> the type of the {@link DoFn} (main) input elements + * @param <OutputT> the type of the {@link DoFn} (main) output elements + */ + private static class DoFnProcessContext<InputT, OutputT> + extends DoFn<InputT, OutputT>.ProcessContext { + + final DoFn<InputT, OutputT> fn; + final DoFnContext<InputT, OutputT> context; + final WindowedValue<InputT> windowedValue; + + public DoFnProcessContext( + DoFn<InputT, OutputT> fn, + DoFnContext<InputT, OutputT> context, + WindowedValue<InputT> windowedValue) { + fn.super(); + this.fn = fn; + this.context = context; + this.windowedValue = windowedValue; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public InputT element() { + return windowedValue.getValue(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + checkNotNull(view, "View passed to sideInput cannot be null"); + Iterator<? extends BoundedWindow> windowIter = windows().iterator(); + BoundedWindow window; + if (!windowIter.hasNext()) { + if (context.windowFn instanceof GlobalWindows) { + // TODO: Remove this once GroupByKeyOnly no longer outputs elements + // without windows + window = GlobalWindow.INSTANCE; + } else { + throw new IllegalStateException( + "sideInput called when main input element is not in any windows"); + } + } else { + window = windowIter.next(); + if (windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is in multiple windows"); + } + } + return context.sideInput(view, window); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public void output(OutputT output) { + context.outputWindowedValue(windowedValue.withValue(output)); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkTimestamp(timestamp); + context.outputWindowedValue( + output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + } + + void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + context.outputWindowedValue(output, timestamp, windows, pane); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + checkNotNull(tag, "Tag passed to sideOutput cannot be null"); + context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + checkTimestamp(timestamp); + context.sideOutputWindowedValue( + tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + } + + @Override + public Instant timestamp() { + return windowedValue.getTimestamp(); + } + + public Collection<? extends BoundedWindow> windows() { + return windowedValue.getWindows(); + } + + private void checkTimestamp(Instant timestamp) { + if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { + throw new IllegalArgumentException( + String.format( + "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + + "timestamp of the current input (%s) minus the allowed skew (%s). See the " + + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed " + + "skew.", + timestamp, + windowedValue.getTimestamp(), + PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); + } + } + + @Override + protected <AggregatorInputT, AggregatorOutputT> + Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator( + String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) { + return context.createAggregator(name, combiner); + } + } + + private class DoFnExtraContextFactory<InputT, OutputT> + implements DoFn.ExtraContextFactory<InputT, OutputT> { + + /** The windows of the current element. */ + private final Collection<? extends BoundedWindow> windows; + + /** The pane of the current element. */ + private final PaneInfo pane; + + public DoFnExtraContextFactory(Collection<? extends BoundedWindow> windows, PaneInfo pane) { + this.windows = windows; + this.pane = pane; + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(windows); + } + + @Override + public InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("InputProvider parameters are not supported."); + } + + @Override + public OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("OutputReceiver parameters are not supported."); + } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return new WindowingInternals<InputT, OutputT>() { + @Override + public Collection<? extends BoundedWindow> windows() { + return windows; + } + + @Override + public PaneInfo pane() { + return pane; + } + + @Override + public TimerInternals timerInternals() { + return context.stepContext.timerInternals(); + } + + @Override + public <T> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) + throws IOException { + @SuppressWarnings("unchecked") + Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); + + context.stepContext.writePCollectionViewData( + tag, + data, + IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), + window(), + windowCoder); + } + + @Override + public StateInternals<?> stateInternals() { + return context.stepContext.stateInternals(); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) {} + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + return context.sideInput(view, mainInputWindow); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index f87f1c1..e02c8a6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -465,26 +465,4 @@ public class DirectRunner return NanosOffsetClock.create(); } } - - /** - * A {@link Supplier} that creates a {@link ExecutorService} based on - * {@link Executors#newFixedThreadPool(int)}. - */ - private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> { - @Override - public ExecutorService get() { - return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - } - } - - - /** - * A {@link Supplier} that creates a {@link NanosOffsetClock}. - */ - private static class NanosOffsetClockSupplier implements Supplier<Clock> { - @Override - public Clock get() { - return NanosOffsetClock.create(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java deleted file mode 100644 index b976b69..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import com.google.common.base.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; - -/** - * A pool of resources associated with specific keys. Implementations enforce specific use patterns, - * such as limiting the the number of outstanding elements available per key. - */ -interface KeyedResourcePool<K, V> { - /** - * Tries to acquire a value for the provided key, loading it via the provided loader if necessary. - * - * <p>If the returned {@link Optional} contains a value, the caller obtains ownership of that - * value. The value should be released back to this {@link KeyedResourcePool} after the - * caller no longer has use of it using {@link #release(Object, Object)}. - * - * <p>The provided {@link Callable} <b>must not</b> return null; it may either return a non-null - * value or throw an exception. - */ - Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException; - - /** - * Release the provided value, relinquishing ownership of it. Future calls to - * {@link #tryAcquire(Object, Callable)} may return the released value. - */ - void release(K key, V value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java deleted file mode 100644 index 8b1e0b1..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ExecutionError; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; - -/** - * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for - * each key. - */ -class LockedKeyedResourcePool<K, V> implements KeyedResourcePool<K, V> { - /** - * A map from each key to an {@link Optional} of the associated value. At most one value is stored - * per key, and it is obtained by at most one thread at a time. - * - * <p>For each key in this map: - * - * <ul> - * <li>If there is no associated value, then no value has been stored yet. - * <li>If the value is {@code Optional.absent()} then the value is currently in use. - * <li>If the value is {@code Optional.present()} then the contained value is available for use. - * </ul> - */ - public static <K, V> LockedKeyedResourcePool<K, V> create() { - return new LockedKeyedResourcePool<>(); - } - - private final ConcurrentMap<K, Optional<V>> cache; - - private LockedKeyedResourcePool() { - cache = new ConcurrentHashMap<>(); - } - - @Override - public Optional<V> tryAcquire(K key, Callable<V> loader) throws ExecutionException { - Optional<V> value = cache.replace(key, Optional.<V>absent()); - if (value == null) { - // No value already existed, so populate the cache with the value returned by the loader - cache.putIfAbsent(key, Optional.of(load(loader))); - // Some other thread may obtain the result after the putIfAbsent, so retry acquisition - value = cache.replace(key, Optional.<V>absent()); - } - return value; - } - - private V load(Callable<V> loader) throws ExecutionException { - try { - return loader.call(); - } catch (Error t) { - throw new ExecutionError(t); - } catch (RuntimeException e) { - throw new UncheckedExecutionException(e); - } catch (Exception e) { - throw new ExecutionException(e); - } - } - - @Override - public void release(K key, V value) { - Optional<V> replaced = cache.replace(key, Optional.of(value)); - checkNotNull(replaced, "Tried to release before a value was acquired"); - checkState( - !replaced.isPresent(), - "Released a value to a %s where there is already a value present for key %s (%s). " - + "At most one value may be present at a time.", - LockedKeyedResourcePool.class.getSimpleName(), - key, - replaced); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 9a08e8f..0584e41 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -91,7 +91,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { stepContext, inputBundle, application, - (OldDoFn) fnLocal.get(), + fnLocal.get(), application.getTransform().getSideInputs(), mainOutputTag, Collections.<TupleTag<?>>emptyList(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 6485714..3dd44a7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -116,41 +116,4 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { throw toThrow; } } - - @Override - public void cleanup() throws Exception { - Collection<Exception> thrownInCleanup = new ArrayList<>(); - for (TransformEvaluatorFactory factory : factories.values()) { - try { - factory.cleanup(); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - thrownInCleanup.add(e); - } - } - finished.set(true); - if (!thrownInCleanup.isEmpty()) { - LOG.error("Exceptions {} thrown while cleaning up evaluators", thrownInCleanup); - Exception toThrow = null; - for (Exception e : thrownInCleanup) { - if (toThrow == null) { - toThrow = e; - } else { - toThrow.addSuppressed(e); - } - } - throw toThrow; - } - } - - /** - * A factory to create Transform Evaluator Registries. - */ - public static class Factory { - public TransformEvaluatorRegistry create() { - return TransformEvaluatorRegistry.defaultRegistry(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java deleted file mode 100644 index e1e24a3..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ExecutionError; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link LockedKeyedResourcePool}. - */ -@RunWith(JUnit4.class) -public class LockedKeyedResourcePoolTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private LockedKeyedResourcePool<String, Integer> cache = - LockedKeyedResourcePool.create(); - - @Test - public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException { - Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(returned.get(), equalTo(3)); - - cache.release("foo", 4); - Optional<Integer> reacquired = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 5; - } - }); - assertThat(reacquired.get(), equalTo(4)); - } - - @Test - public void acquireReleaseReleaseThrows() throws ExecutionException { - Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(returned.get(), equalTo(3)); - - cache.release("foo", 4); - thrown.expect(IllegalStateException.class); - thrown.expectMessage("already a value present"); - thrown.expectMessage("At most one"); - cache.release("foo", 4); - } - - @Test - public void releaseBeforeAcquireThrows() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("before a value was acquired"); - cache.release("bar", 3); - } - - @Test - public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException { - Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - Optional<Integer> secondReturned = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(secondReturned.isPresent(), is(false)); - } - - @Test - public void acquireMultipleKeysSucceeds() throws ExecutionException { - Optional<Integer> returned = cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - Optional<Integer> secondReturned = cache.tryAcquire("bar", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - return 4; - } - }); - - assertThat(returned.get(), equalTo(3)); - assertThat(secondReturned.get(), equalTo(4)); - } - - @Test - public void acquireThrowsExceptionWrapped() throws ExecutionException { - final Exception cause = new Exception("checkedException"); - thrown.expect(ExecutionException.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } - - @Test - public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException { - final RuntimeException cause = new RuntimeException("UncheckedException"); - thrown.expect(UncheckedExecutionException.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } - - @Test - public void acquireThrowsErrorWrapped() throws ExecutionException { - final Error cause = new Error("Error"); - thrown.expect(ExecutionError.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable<Integer>() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java new file mode 100644 index 0000000..58f41b6 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Flink Beam runner exemple. + */ +package org.apache.beam.runners.flink.examples.streaming; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java deleted file mode 100644 index 57f1e59..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 6576ba6..6c104eb 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -225,7 +225,7 @@ <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> - <type>test-jar</type> + <classifier>tests</classifier> <scope>test</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index 2011a4b..e7c621e 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -60,4 +61,9 @@ public class GearpumpPipelineResult implements PipelineResult { new UnsupportedOperationException()); } + @Override + public MetricResults metrics() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java index ad7bb3e..9e32227 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java @@ -53,8 +53,6 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; * A {@link PipelineRunner} that executes the operations in the * pipeline by first translating them to Gearpump Stream DSL * and then executing them on a Gearpump cluster. - * <p>> - * This is based on DataflowPipelineRunner. */ @SuppressWarnings({"rawtypes", "unchecked"}) public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResult> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java new file mode 100644 index 0000000..c8587d3 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import java.io.Serializable; + +import org.apache.beam.sdk.transforms.PTransform; + +/** + * translates {@link PTransform} to Gearpump functions. + */ +public interface TransformTranslator<T extends PTransform> extends Serializable { + void translate(T transform, TranslationContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java new file mode 100644 index 0000000..dfdecb2 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/UnboundedSourceWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.io; + +import java.io.IOException; + +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * wrapper over UnboundedSource for Gearpump DataSource API. + */ +public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> + extends GearpumpSource<OutputT> { + + private final UnboundedSource<OutputT, CheckpointMarkT> source; + + public UnboundedSourceWrapper(UnboundedSource<OutputT, CheckpointMarkT> source, + PipelineOptions options) { + super(options); + this.source = source; + } + + @Override + protected Source.Reader<OutputT> createReader(PipelineOptions options) throws IOException { + return source.createReader(options, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java index e205575..ec86a8d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java @@ -18,7 +18,8 @@ package org.apache.beam.runners.gearpump.translators.utils; -import com.google.common.base.Preconditions; +import static org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull; + import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -59,6 +60,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; + /** * a serializable {@link SimpleDoFnRunner}. */ @@ -330,20 +332,20 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O @Override public <T> void sideOutput(TupleTag<T> tag, T output) { - Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); + checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); } @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); + checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); } @Override protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - Preconditions.checkNotNull(combiner, + checkNotNull(combiner, "Combiner passed to createAggregator cannot be null"); throw new UnsupportedOperationException("aggregator not supported in Gearpump runner"); } @@ -386,7 +388,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O @Override public <T> T sideInput(PCollectionView<T> view) { - Preconditions.checkNotNull(view, "View passed to sideInput cannot be null"); + checkNotNull(view, "View passed to sideInput cannot be null"); Iterator<? extends BoundedWindow> windowIter = windows().iterator(); BoundedWindow window; if (!windowIter.hasNext()) { @@ -435,13 +437,13 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O @Override public <T> void sideOutput(TupleTag<T> tag, T output) { - Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null"); + checkNotNull(tag, "Tag passed to sideOutput cannot be null"); context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); } @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); + checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); context.sideOutputWindowedValue( tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index b84a1a8..b211c04 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -77,4 +77,3 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { return outputMap; } } - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 69c450e..4dfbee6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -45,8 +45,6 @@ public class DoFnFunction<InputT, OutputT> implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> { private final Accumulator<NamedAggregators> accum; private final OldDoFn<InputT, OutputT> mFunction; - private static final Logger LOG = LoggerFactory.getLogger(DoFnFunction.class); - private final SparkRuntimeContext mRuntimeContext; private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs; private final WindowFn<Object, ?> windowFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 0776786..2286832 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -134,7 +134,7 @@ public class Pipeline { */ public static Pipeline create(PipelineOptions options) { Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options); - LOG.info("Creating {}", pipeline); + LOG.debug("Creating {}", pipeline); return pipeline; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 78ea988..2dbcda7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -33,20 +34,8 @@ import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.regex.Pattern; + import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index 1ec4103..ede1507 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -57,7 +57,6 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { .fromFactoryMethod("fromOptions") .withArg(PipelineOptions.class, options) .build(); - System.out.println("runner: " + result.getClass().getName()); return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 3202000..f1bf09d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -95,7 +95,6 @@ public class TestPipeline extends Pipeline { } public static TestPipeline fromOptions(PipelineOptions options) { - System.out.println(options); return new TestPipeline(PipelineRunner.fromOptions(options), options); } @@ -134,7 +133,6 @@ public class TestPipeline extends Pipeline { @Nullable String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); - System.out.println("options " + beamTestPipelineOptions); PipelineOptions options = Strings.isNullOrEmpty(beamTestPipelineOptions) ? PipelineOptionsFactory.create() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index f2fa87c..018877f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -423,20 +423,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD ///////////////////////////////////////////////////////////////////////////// - - /** - * Annotation for the method to use to prepare an instance for processing bundles of elements. The - * method annotated with this must satisfy the following constraints - * <ul> - * <li>It must have zero arguments. - * </ul> - */ - @Documented - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - public @interface Setup { - } - /** * Annotation for declaring and dereferencing state cells. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index a7dc136..a3a306a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -670,18 +670,6 @@ public class ParDo { /** * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFn} - * function, and which has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return of(adapt(fn), fn.getClass()); - } - - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this * transform but that will invoke the given {@link OldDoFn} * function, and that has its input and output types bound. Does * not modify this transform. The resulting {@link PTransform} is http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java deleted file mode 100644 index 84b5b68..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.isOneOf; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Objects; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link Latest.LatestFn}. - * */ -@RunWith(JUnit4.class) -public class LatestFnTests { - private static final Instant INSTANT = new Instant(100); - private static final long VALUE = 100 * INSTANT.getMillis(); - - private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT); - private static final TimestampedValue<Long> TV_MINUS_TEN = - TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); - private static final TimestampedValue<Long> TV_PLUS_TEN = - TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); - - @Rule - public final ExpectedException thrown = ExpectedException.none(); - - private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); - private final Instant baseTimestamp = Instant.now(); - - @Test - public void testDefaultValue() { - assertThat(fn.defaultValue(), nullValue()); - } - - @Test - public void testCreateAccumulator() { - assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); - } - - @Test - public void testAddInputInitialAdd() { - TimestampedValue<Long> input = TV; - assertEquals(input, fn.addInput(fn.createAccumulator(), input)); - } - - @Test - public void testAddInputMinTimestamp() { - TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L); - assertEquals(input, fn.addInput(fn.createAccumulator(), input)); - } - - @Test - public void testAddInputEarlierValue() { - assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); - } - - @Test - public void testAddInputLaterValue() { - assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); - } - - @Test - public void testAddInputSameTimestamp() { - TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT); - TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT); - - assertThat("Latest for values with the same timestamp is chosen arbitrarily", - fn.addInput(accum, input), isOneOf(accum, input)); - } - - @Test - public void testAddInputNullAccumulator() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("accumulators"); - fn.addInput(null, TV); - } - - @Test - public void testAddInputNullInput() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("input"); - fn.addInput(TV, null); - } - - @Test - public void testAddInputNullValue() { - TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10)); - assertEquals("Null values are allowed", input, fn.addInput(TV, input)); - } - - @Test - public void testMergeAccumulatorsMultipleValues() { - Iterable<TimestampedValue<Long>> accums = Lists.newArrayList( - TV, - TV_PLUS_TEN, - TV_MINUS_TEN - ); - - assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums)); - } - - @Test - public void testMergeAccumulatorsSingleValue() { - assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV))); - } - - @Test - public void testMergeAccumulatorsEmptyIterable() { - ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList(); - assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums)); - } - - @Test - public void testMergeAccumulatorsDefaultAccumulator() { - TimestampedValue<Long> defaultAccum = fn.createAccumulator(); - assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum))); - } - - @Test - public void testMergeAccumulatorsAllDefaultAccumulators() { - TimestampedValue<Long> defaultAccum = fn.createAccumulator(); - assertEquals(defaultAccum, fn.mergeAccumulators( - Lists.newArrayList(defaultAccum, defaultAccum))); - } - - @Test - public void testMergeAccumulatorsNullIterable() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("accumulators"); - fn.mergeAccumulators(null); - } - - @Test - public void testExtractOutput() { - assertEquals(TV.getValue(), fn.extractOutput(TV)); - } - - @Test - public void testExtractOutputDefaultAggregator() { - TimestampedValue<Long> accum = fn.createAccumulator(); - assertThat(fn.extractOutput(accum), nullValue()); - } - - @Test - public void testExtractOutputNullValue() { - TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp); - assertEquals(null, fn.extractOutput(accum)); - } - - @Test - public void testAggregator() throws Exception { - LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue()); - DoFnTester<Long, Long> harness = DoFnTester.of(doFn); - for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) { - harness.processTimestampedElement(element); - } - - assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg)); - assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg)); - assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue()); - } - - @Test - public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { - Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); - - CoderRegistry registry = new CoderRegistry(); - TimestampedValue.TimestampedValueCoder<Long> inputCoder = - TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); - - assertThat("Default output coder should handle null values", - fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class)); - assertThat("Default accumulator coder should handle null values", - fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class)); - } - - static class LatestAggregatorsFn<T> extends DoFn<T, T> { - private final T specialValue; - LatestAggregatorsFn(T specialValue) { - this.specialValue = specialValue; - } - - Aggregator<TimestampedValue<T>, T> allValuesAgg = - createAggregator("allValues", new Latest.LatestFn<T>()); - - Aggregator<TimestampedValue<T>, T> specialValueAgg = - createAggregator("oneValue", new Latest.LatestFn<T>()); - - Aggregator<TimestampedValue<T>, T> noValuesAgg = - createAggregator("noValues", new Latest.LatestFn<T>()); - - @ProcessElement - public void processElement(ProcessContext c) { - TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp()); - allValuesAgg.addValue(val); - if (Objects.equals(c.element(), specialValue)) { - specialValueAgg.addValue(val); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java deleted file mode 100644 index 44dbf4a..0000000 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms for reading and writing from Amazon Kinesis. - */ -package org.apache.beam.sdk.io.kinesis; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f013cb7/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java deleted file mode 100644 index fd08b58..0000000 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms for reading and writing from MongoDB. - */ -package org.apache.beam.sdk.io.mongodb;
