http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index 365b6c4..c0919b9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -64,9 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { c.element()[0] = 'b'; } })); - PCollection<Long> consumer = pcollection.apply(Count.<byte[]>globally()); - DirectGraphs.performDirectOverrides(p); - this.consumer = DirectGraphs.getProducer(consumer); + consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally())); } @Test
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 7912538..09a21ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -98,7 +98,7 @@ public class ParDoEvaluatorTest { when(evaluationContext.createBundle(output)).thenReturn(outputBundle); ParDoEvaluator<Integer> evaluator = - createEvaluator(singletonView, fn, inputPc, output); + createEvaluator(singletonView, fn, output); IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3); @@ -132,7 +132,6 @@ public class ParDoEvaluatorTest { private ParDoEvaluator<Integer> createEvaluator( PCollectionView<Integer> singletonView, RecorderFn fn, - PCollection<Integer> input, PCollection<Integer> output) { when( evaluationContext.createSideInputReader( @@ -150,7 +149,6 @@ public class ParDoEvaluatorTest { Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class))) .thenReturn(executionContext); - DirectGraphs.performDirectOverrides(p); @SuppressWarnings("unchecked") AppliedPTransform<PCollection<Integer>, ?, ?> transform = (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output); @@ -158,7 +156,8 @@ public class ParDoEvaluatorTest { evaluationContext, stepContext, transform, - input.getWindowingStrategy(), + ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values())) + .getWindowingStrategy(), fn, null /* key */, ImmutableList.<PCollectionView<?>>of(singletonView), http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index fe0b743..9366b7c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -41,7 +41,6 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -53,6 +52,7 @@ import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -128,17 +128,16 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { input .apply( new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( - new DoFn<KV<String, Integer>, Integer>() { - @StateId(stateId) - private final StateSpec<ValueState<String>> spec = - StateSpecs.value(StringUtf8Coder.of()); - - @ProcessElement - public void process(ProcessContext c) {} - }, - mainOutput, - TupleTagList.empty(), - Collections.<PCollectionView<?>>emptyList())) + ParDo.of( + new DoFn<KV<String, Integer>, Integer>() { + @StateId(stateId) + private final StateSpec<ValueState<String>> spec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext c) {} + }) + .withOutputTags(mainOutput, TupleTagList.empty()))) .get(mainOutput) .setCoder(VarIntCoder.of()); @@ -154,7 +153,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.<StructuralKey>any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext); + when(mockExecutionContext.getStepContext(anyString())) + .thenReturn(mockStepContext); IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9)); IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(19)); @@ -241,17 +241,18 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { mainInput .apply( new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( - new DoFn<KV<String, Integer>, Integer>() { - @StateId(stateId) - private final StateSpec<ValueState<String>> spec = - StateSpecs.value(StringUtf8Coder.of()); - - @ProcessElement - public void process(ProcessContext c) {} - }, - mainOutput, - TupleTagList.empty(), - Collections.<PCollectionView<?>>singletonList(sideInput))) + ParDo + .of( + new DoFn<KV<String, Integer>, Integer>() { + @StateId(stateId) + private final StateSpec<ValueState<String>> spec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext c) {} + }) + .withSideInputs(sideInput) + .withOutputTags(mainOutput, TupleTagList.empty()))) .get(mainOutput) .setCoder(VarIntCoder.of()); @@ -268,7 +269,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { when(mockEvaluationContext.getExecutionContext( eq(producingTransform), Mockito.<StructuralKey>any())) .thenReturn(mockExecutionContext); - when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext); + when(mockExecutionContext.getStepContext(anyString())) + .thenReturn(mockStepContext); when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any())) .thenReturn(mockUncommittedBundle); when(mockStepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); @@ -285,8 +287,11 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { // global window state merely by having the evaluator created. The cleanup logic does not // depend on the window. String key = "hello"; - WindowedValue<KV<String, Integer>> firstKv = - WindowedValue.of(KV.of(key, 1), new Instant(3), firstWindow, PaneInfo.NO_FIRING); + WindowedValue<KV<String, Integer>> firstKv = WindowedValue.of( + KV.of(key, 1), + new Instant(3), + firstWindow, + PaneInfo.NO_FIRING); WindowedValue<KeyedWorkItem<String, KV<String, Integer>>> gbkOutputElement = firstKv.withValue( @@ -301,8 +306,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { BUNDLE_FACTORY .createBundle( (PCollection<KeyedWorkItem<String, KV<String, Integer>>>) - Iterables.getOnlyElement( - TransformInputs.nonAdditionalInputs(producingTransform))) + Iterables.getOnlyElement(producingTransform.getInputs().values())) .add(gbkOutputElement) .commit(Instant.now()); TransformEvaluator<KeyedWorkItem<String, KV<String, Integer>>> evaluator = @@ -312,7 +316,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { // This should push back every element as a KV<String, Iterable<Integer>> // in the appropriate window. Since the keys are equal they are single-threaded - TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result = evaluator.finishBundle(); + TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result = + evaluator.finishBundle(); List<Integer> pushedBackInts = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index b7f5a7c..86412a0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -25,8 +25,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Collection; @@ -92,7 +90,6 @@ public class TransformExecutorTest { created = p.apply(Create.of("foo", "spam", "third")); PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3)); - DirectGraphs.performDirectOverrides(p); DirectGraph graph = DirectGraphs.getGraph(p); createdProducer = graph.getProducer(created); downstreamProducer = graph.getProducer(downstream); @@ -417,13 +414,8 @@ public class TransformExecutorTest { ? Collections.emptyList() : result.getUnprocessedElements(); - Optional<? extends CommittedBundle<?>> unprocessedBundle; - if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) { - unprocessedBundle = Optional.absent(); - } else { - unprocessedBundle = - Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements)); - } + CommittedBundle<?> unprocessedBundle = + inputBundle == null ? null : inputBundle.withElements(unprocessedElements); return CommittedResult.create( result, unprocessedBundle, http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 5bc48b7..419698e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; import org.joda.time.Instant; import org.junit.Rule; @@ -65,13 +66,12 @@ public class ViewEvaluatorFactoryTest { .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) .apply(GroupByKey.<Void, String>create()) .apply(Values.<Iterable<String>>create()); - PCollection<Iterable<String>> view = - concat.apply( - new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView())); + PCollectionView<Iterable<String>> view = + concat.apply(new ViewOverrideFactory.WriteView<>(createView)); EvaluationContext context = mock(EvaluationContext.class); TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); - when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter); + when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now()); AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java index 6af9273..024e15c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -37,11 +37,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.transforms.ViewFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; @@ -66,19 +63,24 @@ public class ViewOverrideFactoryTest implements Serializable { PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); final PCollectionView<List<Integer>> view = PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); - PTransformReplacement<PCollection<Integer>, PCollection<Integer>> + PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>> replacementTransform = factory.getReplacementTransform( AppliedPTransform - .<PCollection<Integer>, PCollection<Integer>, - PTransform<PCollection<Integer>, PCollection<Integer>>> + .<PCollection<Integer>, PCollectionView<List<Integer>>, + CreatePCollectionView<Integer, List<Integer>>> of( "foo", ints.expand(), view.expand(), CreatePCollectionView.<Integer, List<Integer>>of(view), p)); - ints.apply(replacementTransform.getTransform()); + PCollectionView<List<Integer>> afterReplacement = + ints.apply(replacementTransform.getTransform()); + assertThat( + "The CreatePCollectionView replacement should return the same View", + afterReplacement, + equalTo(view)); PCollection<Set<Integer>> outputViewContents = p.apply("CreateSingleton", Create.of(0)) @@ -102,11 +104,11 @@ public class ViewOverrideFactoryTest implements Serializable { final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); final PCollectionView<List<Integer>> view = PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); - PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement = + PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>> replacement = factory.getReplacementTransform( AppliedPTransform - .<PCollection<Integer>, PCollection<Integer>, - PTransform<PCollection<Integer>, PCollection<Integer>>> + .<PCollection<Integer>, PCollectionView<List<Integer>>, + CreatePCollectionView<Integer, List<Integer>>> of( "foo", ints.expand(), @@ -124,19 +126,8 @@ public class ViewOverrideFactoryTest implements Serializable { "There should only be one WriteView primitive in the graph", writeViewVisited.getAndSet(true), is(false)); - PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView(); - - // replacementView.getPCollection() is null, but that is not a requirement - // so not asserted one way or the other - assertThat( - replacementView.getTagInternal(), - equalTo(view.getTagInternal())); - assertThat( - replacementView.getViewFn(), - Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn())); - assertThat( - replacementView.getWindowMappingFn(), - Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn())); + PCollectionView replacementView = ((WriteView) node.getTransform()).getView(); + assertThat(replacementView, Matchers.<PCollectionView>theInstance(view)); assertThat(node.getInputs().entrySet(), hasSize(1)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java index 1d8aac1..b667346 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -59,7 +59,6 @@ public class WatermarkCallbackExecutorTest { public void setup() { PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); PCollection<Integer> summed = created.apply(Sum.integersGlobally()); - DirectGraphs.performDirectOverrides(p); DirectGraph graph = DirectGraphs.getGraph(p); create = graph.getProducer(created); sum = graph.getProducer(summed); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index e3f6215..9528ac9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -122,7 +121,6 @@ public class WatermarkManagerTest implements Serializable { flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections()); clock = MockClock.fromInstant(new Instant(1000)); - DirectGraphs.performDirectOverrides(p); graph = DirectGraphs.getGraph(p); manager = WatermarkManager.create(clock, graph); @@ -319,7 +317,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(graph.getProducer(created)).build(), - Optional.<CommittedBundle<?>>absent(), + root.withElements(Collections.<WindowedValue<Void>>emptyList()), Collections.singleton(createBundle), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -333,7 +331,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(theFlatten).build(), - Optional.<CommittedBundle<?>>absent(), + createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList(), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -346,7 +344,7 @@ public class WatermarkManagerTest implements Serializable { TimerUpdate.empty(), CommittedResult.create( StepTransformResult.withoutHold(theFlatten).build(), - Optional.<CommittedBundle<?>>absent(), + createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()), Collections.<CommittedBundle<?>>emptyList(), EnumSet.allOf(OutputType.class)), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -1502,15 +1500,9 @@ public class WatermarkManagerTest implements Serializable { AppliedPTransform<?, ?, ?> transform, @Nullable CommittedBundle<?> unprocessedBundle, Iterable<? extends CommittedBundle<?>> bundles) { - Optional<? extends CommittedBundle<?>> unprocessedElements; - if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) { - unprocessedElements = Optional.absent(); - } else { - unprocessedElements = Optional.of(unprocessedBundle); - } return CommittedResult.create( StepTransformResult.withoutHold(transform).build(), - unprocessedElements, + unprocessedBundle, bundles, Iterables.isEmpty(bundles) ? EnumSet.noneOf(OutputType.class) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 546a181..a88d95e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -30,7 +30,6 @@ import static org.junit.Assert.assertThat; import java.io.File; import java.io.FileReader; import java.io.Reader; -import java.io.Serializable; import java.nio.CharBuffer; import java.util.ArrayList; import java.util.Collections; @@ -39,8 +38,9 @@ import java.util.UUID; import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.DynamicFileDestinations; +import org.apache.beam.sdk.io.DefaultFilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.LocalResources; import org.apache.beam.sdk.io.TextIO; @@ -53,8 +53,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -73,17 +71,11 @@ import org.junit.runners.JUnit4; * Tests for {@link WriteWithShardingFactory}. */ @RunWith(JUnit4.class) -public class WriteWithShardingFactoryTest implements Serializable { - +public class WriteWithShardingFactoryTest { private static final int INPUT_SIZE = 10000; - - @Rule public transient TemporaryFolder tmp = new TemporaryFolder(); - - private transient WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>(); - - @Rule - public final transient TestPipeline p = - TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>(); + @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Test public void dynamicallyReshardedWrite() throws Exception { @@ -137,24 +129,26 @@ public class WriteWithShardingFactoryTest implements Serializable { @Test public void withNoShardingSpecifiedReturnsNewTransform() { ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */); - - PTransform<PCollection<Object>, PDone> original = + FilenamePolicy policy = + DefaultFilenamePolicy.constructUsingStandardParameters( + StaticValueProvider.of(outputDirectory), + DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE, + "", + false); + WriteFiles<Object> original = WriteFiles.to( - new FileBasedSink<Object, Void>( - StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) { + new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) { @Override - public WriteOperation<Object, Void> createWriteOperation() { + public WriteOperation<Object> createWriteOperation() { throw new IllegalArgumentException("Should not be used"); } - }, - SerializableFunctions.identity()); + }); @SuppressWarnings("unchecked") PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); - AppliedPTransform<PCollection<Object>, PDone, PTransform<PCollection<Object>, PDone>> - originalApplication = - AppliedPTransform.of( - "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p); + AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication = + AppliedPTransform.of( + "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p); assertThat( factory.getReplacementTransform(originalApplication).getTransform(), http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index c063a2d..c4c6b55 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -91,6 +91,7 @@ <excludedGroups> org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.LargeKeys$Above100MB, + org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream, org.apache.beam.sdk.testing.UsesSplittableParDo @@ -380,13 +381,5 @@ <type>test-jar</type> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java deleted file mode 100644 index 0cc3aec..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.collect.Iterables; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** Flink streaming overrides for various view (side input) transforms. */ -class CreateStreamingFlinkView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { - private final PCollectionView<ViewT> view; - - public CreateStreamingFlinkView(PCollectionView<ViewT> view) { - this.view = view; - } - - @Override - public PCollection<ElemT> expand(PCollection<ElemT> input) { - input - .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) - .apply(CreateFlinkPCollectionView.<ElemT, ViewT>of(view)); - return input; - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use by {@link CreateStreamingFlinkView}. This combiner requires that the input - * {@link PCollection} fits in memory. For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - @Override - public List<T> createAccumulator() { - return new ArrayList<T>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - - /** - * Creates a primitive {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateFlinkPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> { - private PCollectionView<ViewT> view; - - private CreateFlinkPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } - - public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateFlinkPCollectionView<>(view); - } - - @Override - public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) { - return PCollection.<List<ElemT>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); - } - - public PCollectionView<ViewT> getView() { - return view; - } - } - - public static class Factory<ElemT, ViewT> - implements PTransformOverrideFactory< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> { - public Factory() {} - - @Override - public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform( - AppliedPTransform< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> - transform) { - return PTransformReplacement.of( - (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()), - new CreateStreamingFlinkView<ElemT, ViewT>(transform.getTransform().getView())); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 6e70198..0439119 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.flink; import com.google.common.collect.Iterables; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -144,7 +143,7 @@ class FlinkBatchTranslationContext { @SuppressWarnings("unchecked") <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); } Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index d2a2016..fe5dd87 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -84,8 +84,6 @@ class FlinkPipelineExecutionEnvironment { this.flinkBatchEnv = null; this.flinkStreamEnv = null; - pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming())); - PipelineTranslationOptimizer optimizer = new PipelineTranslationOptimizer(TranslationMode.BATCH, options); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index f733e2e..8da68c5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -17,18 +17,27 @@ */ package org.apache.beam.runners.flink; +import com.google.common.collect.ImmutableList; +import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; +import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValue; @@ -64,8 +73,54 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @Override public void translate(Pipeline pipeline) { + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), + new SplittableParDoOverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class), + new SplittableParDoViaKeyedWorkItems.OverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))) + // this has to be last since the ViewAsSingleton override + // can expand to a Combine.GloballyAsSingletonView + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, + flinkRunner))) + .build(); + // Ensure all outputs of all reads are consumed. UnconsumedReads.ensureAllReadsConsumed(pipeline); + pipeline.replaceAll(transformOverrides); super.translate(pipeline); } @@ -173,6 +228,35 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { } } + private static class ReflectiveOneToOneOverrideFactory< + InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> + extends SingleInputOutputOverrideFactory< + PCollection<InputT>, PCollection<OutputT>, TransformT> { + private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement; + private final FlinkRunner runner; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement, + FlinkRunner runner) { + this.replacement = replacement; + this.runner = runner; + } + + @Override + public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform( + AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + InstanceBuilder.ofType(replacement) + .withArg(FlinkRunner.class, runner) + .withArg( + (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>) + transform.getTransform().getClass(), + transform.getTransform()) + .build()); + } + } + /** * A {@link PTransformOverrideFactory} that overrides a <a * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}. @@ -188,7 +272,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform)); + new SplittableParDo<>(transform.getTransform())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 3d7e81f..2a7c5d6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; @@ -123,7 +124,7 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); TRANSLATORS.put( - CreateStreamingFlinkView.CreateFlinkPCollectionView.class, + FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class, new CreateViewStreamingTranslator()); TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming()); @@ -362,13 +363,8 @@ class FlinkStreamingTransformTranslators { Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap(); for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { if (!tagsToOutputTags.containsKey(entry.getKey())) { - tagsToOutputTags.put( - entry.getKey(), - new OutputTag<WindowedValue<?>>( - entry.getKey().getId(), - (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()) - ) - ); + tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(), + (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()))); } } @@ -547,11 +543,14 @@ class FlinkStreamingTransformTranslators { transform.getAdditionalOutputTags().getAll(), context, new ParDoTranslationHelper.DoFnOperatorFactory< - KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() { + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() { @Override - public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> - createDoFnOperator( - DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn, + public DoFnOperator< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, + OutputT> createDoFnOperator( + DoFn< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, + OutputT> doFn, String stepName, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, @@ -559,8 +558,11 @@ class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, - Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> - inputCoder, + Coder< + WindowedValue< + KeyedWorkItem< + String, + ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, Coder keyCoder, Map<Integer, PCollectionView<?>> transformedSideInputs) { return new SplittableDoFnOperator<>( @@ -582,17 +584,17 @@ class FlinkStreamingTransformTranslators { private static class CreateViewStreamingTranslator<ElemT, ViewT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> { + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> { @Override public void translateNode( - CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform, + FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform, FlinkStreamingTranslationContext context) { // just forward DataStream<WindowedValue<List<ElemT>>> inputDataSet = context.getInputDataStream(context.getInput(transform)); - PCollectionView<ViewT> view = transform.getView(); + PCollectionView<ViewT> view = context.getOutput(transform); context.setOutputDataStream(view, inputDataSet); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 74a5fb9..ea5f6b3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -114,7 +113,7 @@ class FlinkStreamingTranslationContext { @SuppressWarnings("unchecked") public <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); } public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java new file mode 100644 index 0000000..ce1c895 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; + +/** + * Flink streaming overrides for various view (side input) transforms. + */ +class FlinkStreamingViewOverrides { + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} + * for the Flink runner in streaming mode. + */ + static class StreamingViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + private final transient FlinkRunner runner; + + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * View.AsMultimap View.AsMultimap} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + private final transient FlinkRunner runner; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link View.AsList View.AsList} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {} + + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link View.AsIterable View.AsIterable} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { } + + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input, + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Specialized expansion for + * {@link View.AsSingleton View.AsSingleton} for the + * Flink runner in streaming mode. + */ + static class StreamingViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + private View.AsSingleton<T> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingCombineGloballyAsSingletonView( + FlinkRunner runner, + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined, + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, + * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<T>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + + /** + * Creates a primitive {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateFlinkPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateFlinkPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateFlinkPCollectionView<>(view); + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java deleted file mode 100644 index 1dc8de9..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; -import org.apache.beam.runners.core.construction.PTransformMatchers; -import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.sdk.runners.PTransformOverride; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View; - -/** - * {@link PTransform} overrides for Flink runner. - */ -public class FlinkTransformOverrides { - public static List<PTransformOverride> getDefaultOverrides(boolean streaming) { - if (streaming) { - return ImmutableList.<PTransformOverride>builder() - .add( - PTransformOverride.of( - PTransformMatchers.splittableParDoMulti(), - new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory())) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class), - new SplittableParDoViaKeyedWorkItems.OverrideFactory())) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), - new CreateStreamingFlinkView.Factory())) - .build(); - } else { - return ImmutableList.of(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 2f095d4..5d08eba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -55,15 +55,18 @@ import org.joda.time.Instant; * the {@code @ProcessElement} method of a splittable {@link DoFn}. */ public class SplittableDoFnOperator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> { + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + extends DoFnOperator< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { private transient ScheduledExecutorService executorService; public SplittableDoFnOperator( - DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn, + DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn, String stepName, - Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> inputCoder, + Coder< + WindowedValue< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, @@ -84,6 +87,7 @@ public class SplittableDoFnOperator< sideInputs, options, keyCoder); + } @Override @@ -147,7 +151,7 @@ public class SplittableDoFnOperator< @Override public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { doFnRunner.processElement(WindowedValue.valueInGlobalWindow( - KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem( + KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( (String) stateInternals.getKey(), Collections.singletonList(timer.getNamespace())))); }
