http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 4b5d5f5..171171f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -45,19 +45,14 @@ import org.apache.beam.sdk.coders.CoderProviders; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.LargeKeys; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; @@ -189,40 +184,6 @@ public class GroupByKeyTest { p.run(); } - /** - * Tests that when a processing time timers comes in after a window is expired it does not cause a - * spurious output. - */ - @Test - @Category({ValidatesRunner.class, UsesTestStream.class}) - public void testCombiningAccumulatingProcessingTime() throws Exception { - PCollection<Integer> triggeredSums = - p.apply( - TestStream.create(VarIntCoder.of()) - .advanceWatermarkTo(new Instant(0)) - .addElements( - TimestampedValue.of(2, new Instant(2)), - TimestampedValue.of(5, new Instant(5))) - .advanceWatermarkTo(new Instant(100)) - .advanceProcessingTime(Duration.millis(10)) - .advanceWatermarkToInfinity()) - .apply( - Window.<Integer>into(FixedWindows.of(Duration.millis(100))) - .withTimestampCombiner(TimestampCombiner.EARLIEST) - .accumulatingFiredPanes() - .withAllowedLateness(Duration.ZERO) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.millis(10))))) - .apply(Sum.integersGlobally().withoutDefaults()); - - PAssert.that(triggeredSums) - .containsInAnyOrder(7); - - p.run(); - } - @Test public void testGroupByKeyNonDeterministic() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index fa4949e..c67cf2a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -62,8 +62,6 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.MapState; @@ -1593,108 +1591,6 @@ public class ParDoTest implements Serializable { } @Test - public void testStateNotKeyed() { - final String stateId = "foo"; - - DoFn<String, Integer> fn = - new DoFn<String, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} - }; - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("state"); - thrown.expectMessage("KvCoder"); - - pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); - } - - @Test - public void testStateNotDeterministic() { - final String stateId = "foo"; - - // DoubleCoder is not deterministic, so this should crash - DoFn<KV<Double, String>, Integer> fn = - new DoFn<KV<Double, String>, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) {} - }; - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("state"); - thrown.expectMessage("deterministic"); - - pipeline - .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) - .apply(ParDo.of(fn)); - } - - @Test - public void testTimerNotKeyed() { - final String timerId = "foo"; - - DoFn<String, Integer> fn = - new DoFn<String, Integer>() { - - @TimerId(timerId) - private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, @TimerId(timerId) Timer timer) {} - - @OnTimer(timerId) - public void onTimer() {} - }; - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("timer"); - thrown.expectMessage("KvCoder"); - - pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn)); - } - - @Test - public void testTimerNotDeterministic() { - final String timerId = "foo"; - - // DoubleCoder is not deterministic, so this should crash - DoFn<KV<Double, String>, Integer> fn = - new DoFn<KV<Double, String>, Integer>() { - - @TimerId(timerId) - private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, @TimerId(timerId) Timer timer) {} - - @OnTimer(timerId) - public void onTimer() {} - }; - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("timer"); - thrown.expectMessage("deterministic"); - - pipeline - .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again"))) - .apply(ParDo.of(fn)); - } - - @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class}) public void testValueStateCoderInference() { final String stateId = "foo"; @@ -3046,65 +2942,4 @@ public class ParDoTest implements Serializable { // If it doesn't crash, we made it! } - - /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */ - public interface MyOptions extends PipelineOptions { - @Default.String("fake option") - String getFakeOption(); - void setFakeOption(String value); - } - - @Test - @Category(ValidatesRunner.class) - public void testPipelineOptionsParameter() { - PCollection<String> results = pipeline - .apply(Create.of(1)) - .apply( - ParDo.of( - new DoFn<Integer, String>() { - @ProcessElement - public void process(ProcessContext c, PipelineOptions options) { - c.output(options.as(MyOptions.class).getFakeOption()); - } - })); - - String testOptionValue = "not fake anymore"; - pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); - PAssert.that(results).containsInAnyOrder("not fake anymore"); - - pipeline.run(); - } - - @Test - @Category({ValidatesRunner.class, UsesTimersInParDo.class}) - public void testPipelineOptionsParameterOnTimer() { - final String timerId = "thisTimer"; - - PCollection<String> results = - pipeline - .apply(Create.of(KV.of(0, 0))) - .apply( - ParDo.of( - new DoFn<KV<Integer, Integer>, String>() { - @TimerId(timerId) - private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void process( - ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) { - timer.set(w.maxTimestamp()); - } - - @OnTimer(timerId) - public void onTimer(OnTimerContext c, PipelineOptions options) { - c.output(options.as(MyOptions.class).getFakeOption()); - } - })); - - String testOptionValue = "not fake anymore"; - pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); - PAssert.that(results).containsInAnyOrder("not fake anymore"); - - pipeline.run(); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index d2d2529..02a44d2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -18,14 +18,10 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Ordering; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -33,9 +29,6 @@ import java.util.List; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -44,6 +37,7 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -73,16 +67,10 @@ public class SplittableDoFnTest implements Serializable { static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> { @ProcessElement - public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { - for (long i = tracker.currentRestriction().getFrom(), numIterations = 0; - tracker.tryClaim(i); - ++i, ++numIterations) { + public void process(ProcessContext c, OffsetRangeTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { c.output(KV.of(c.element(), (int) i)); - if (numIterations % 3 == 0) { - return resume(); - } } - return stop(); } @GetInitialRestriction @@ -105,25 +93,8 @@ public class SplittableDoFnTest implements Serializable { } } - private static PipelineOptions streamingTestPipelineOptions() { - // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo - // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in - // streaming mode. - // This is a no-op for other runners currently (Direct runner doesn't care, and other - // runners don't implement SDF at all yet). - // - // This is a workaround until https://issues.apache.org/jira/browse/BEAM-1620 - // is properly implemented and supports marking tests as streaming-only. - // - // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the removal of the - // current workaround. - PipelineOptions options = testingPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(true); - return options; - } - @Rule - public final transient TestPipeline p = TestPipeline.fromOptions(streamingTestPipelineOptions()); + public final transient TestPipeline p = TestPipeline.create(); @Test @Category({ValidatesRunner.class, UsesSplittableParDo.class}) @@ -211,12 +182,6 @@ public class SplittableDoFnTest implements Serializable { private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> { private static final int MAX_INDEX = 98765; - private final int numClaimsPerCall; - - private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) { - this.numClaimsPerCall = numClaimsPerCall; - } - private static int snapToNextBlock(int index, int[] blockStarts) { for (int i = 1; i < blockStarts.length; ++i) { if (index > blockStarts[i - 1] && index <= blockStarts[i]) { @@ -227,20 +192,14 @@ public class SplittableDoFnTest implements Serializable { } @ProcessElement - public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { + public void processElement(ProcessContext c, OffsetRangeTracker tracker) { int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); - for (int i = trueStart, numIterations = 1; - tracker.tryClaim(blockStarts[i]); - ++i, ++numIterations) { + for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) { for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { c.output(index); } - if (numIterations == numClaimsPerCall) { - return resume(); - } } - return stop(); } @GetInitialRestriction @@ -253,7 +212,7 @@ public class SplittableDoFnTest implements Serializable { @Category({ValidatesRunner.class, UsesSplittableParDo.class}) public void testOutputAfterCheckpoint() throws Exception { PCollection<Integer> outputs = p.apply(Create.of("foo")) - .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3))); + .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock())); PAssert.thatSingleton(outputs.apply(Count.<Integer>globally())) .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX); p.run(); @@ -328,105 +287,9 @@ public class SplittableDoFnTest implements Serializable { PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7"); p.run(); - } - - @BoundedPerElement - private static class SDFWithMultipleOutputsPerBlockAndSideInput - extends DoFn<Integer, KV<String, Integer>> { - private static final int MAX_INDEX = 98765; - private final PCollectionView<String> sideInput; - private final int numClaimsPerCall; - - public SDFWithMultipleOutputsPerBlockAndSideInput( - PCollectionView<String> sideInput, int numClaimsPerCall) { - this.sideInput = sideInput; - this.numClaimsPerCall = numClaimsPerCall; - } - - private static int snapToNextBlock(int index, int[] blockStarts) { - for (int i = 1; i < blockStarts.length; ++i) { - if (index > blockStarts[i - 1] && index <= blockStarts[i]) { - return i; - } - } - throw new IllegalStateException("Shouldn't get here"); - } - - @ProcessElement - public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) { - int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX}; - int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts); - for (int i = trueStart, numIterations = 1; - tracker.tryClaim(blockStarts[i]); - ++i, ++numIterations) { - for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) { - c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index)); - } - if (numIterations == numClaimsPerCall) { - return resume(); - } - } - return stop(); - } - - @GetInitialRestriction - public OffsetRange getInitialRange(Integer element) { - return new OffsetRange(0, MAX_INDEX); - } - } - - @Test - @Category({ - ValidatesRunner.class, - UsesSplittableParDo.class, - UsesSplittableParDoWithWindowedSideInputs.class - }) - public void testWindowedSideInputWithCheckpoints() throws Exception { - PCollection<Integer> mainInput = - p.apply("main", - Create.timestamped( - TimestampedValue.of(0, new Instant(0)), - TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2)), - TimestampedValue.of(3, new Instant(3)))) - .apply("window 1", Window.<Integer>into(FixedWindows.of(Duration.millis(1)))); - - PCollectionView<String> sideInput = - p.apply("side", - Create.timestamped( - TimestampedValue.of("a", new Instant(0)), - TimestampedValue.of("b", new Instant(2)))) - .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2)))) - .apply("singleton", View.<String>asSingleton()); - - PCollection<KV<String, Integer>> res = - mainInput.apply( - ParDo.of( - new SDFWithMultipleOutputsPerBlockAndSideInput( - sideInput, 3 /* numClaimsPerCall */)) - .withSideInputs(sideInput)); - PCollection<KV<String, Iterable<Integer>>> grouped = - res.apply(GroupByKey.<String, Integer>create()); - - PAssert.that(grouped.apply(Keys.<String>create())) - .containsInAnyOrder("a:0", "a:1", "b:2", "b:3"); - PAssert.that(grouped) - .satisfies( - new SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void>() { - @Override - public Void apply(Iterable<KV<String, Iterable<Integer>>> input) { - List<Integer> expected = new ArrayList<>(); - for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInput.MAX_INDEX; ++i) { - expected.add(i); - } - for (KV<String, Iterable<Integer>> kv : input) { - assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue())); - } - return null; - } - }); - p.run(); + // TODO: also add test coverage when the SDF checkpoints - the resumed call should also + // properly access side inputs. // TODO: also test coverage when some of the windows of the side input are not ready. } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 2098c66..3edb194 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.transforms.reflect; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -91,8 +89,8 @@ public class DoFnInvokersTest { when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext); } - private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) { - return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); + private void invokeProcessElement(DoFn<String, String> fn) { + DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider); } private void invokeOnTimer(String timerId, DoFn<String, String> fn) { @@ -121,7 +119,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c) throws Exception {} } MockFn mockFn = mock(MockFn.class); - assertEquals(stop(), invokeProcessElement(mockFn)); + invokeProcessElement(mockFn); verify(mockFn).processElement(mockProcessContext); } @@ -142,7 +140,7 @@ public class DoFnInvokersTest { public void testDoFnWithProcessElementInterface() throws Exception { IdentityUsingInterfaceWithProcessElement fn = mock(IdentityUsingInterfaceWithProcessElement.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext); } @@ -163,14 +161,14 @@ public class DoFnInvokersTest { @Test public void testDoFnWithMethodInSuperclass() throws Exception { IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).process(mockProcessContext); } @Test public void testDoFnWithMethodInSubclass() throws Exception { IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).process(mockProcessContext); } @@ -181,7 +179,7 @@ public class DoFnInvokersTest { public void processElement(ProcessContext c, IntervalWindow w) throws Exception {} } MockFn fn = mock(MockFn.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext, mockWindow); } @@ -205,7 +203,7 @@ public class DoFnInvokersTest { throws Exception {} } MockFn fn = mock(MockFn.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext, mockState); } @@ -231,35 +229,11 @@ public class DoFnInvokersTest { public void onTimer() {} } MockFn fn = mock(MockFn.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processElement(mockProcessContext, mockTimer); } @Test - public void testDoFnWithReturn() throws Exception { - class MockFn extends DoFn<String, String> { - @DoFn.ProcessElement - public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) - throws Exception { - return null; - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(String element) { - return null; - } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } - } - MockFn fn = mock(MockFn.class); - when(fn.processElement(mockProcessContext, null)).thenReturn(resume()); - assertEquals(resume(), invokeProcessElement(fn)); - } - - @Test public void testDoFnWithStartBundleSetupTeardown() throws Exception { class MockFn extends DoFn<String, String> { @ProcessElement @@ -314,9 +288,7 @@ public class DoFnInvokersTest { /** Public so Mockito can do "delegatesTo()" in the test below. */ public static class MockFn extends DoFn<String, String> { @ProcessElement - public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) { - return null; - } + public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(String element) { @@ -368,7 +340,7 @@ public class DoFnInvokersTest { .splitRestriction( eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any()); when(fn.newTracker(restriction)).thenReturn(tracker); - when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume()); + fn.processElement(mockProcessContext, tracker); assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault())); assertEquals(restriction, invoker.invokeGetInitialRestriction("blah")); @@ -384,8 +356,6 @@ public class DoFnInvokersTest { }); assertEquals(Arrays.asList(part1, part2, part3), outputs); assertEquals(tracker, invoker.invokeNewTracker(restriction)); - assertEquals( - resume(), invoker.invokeProcessElement( new FakeArgumentProvider<String, String>() { @Override @@ -397,7 +367,7 @@ public class DoFnInvokersTest { public RestrictionTracker<?> restrictionTracker() { return tracker; } - })); + }); } private static class RestrictionWithDefaultTracker @@ -471,7 +441,7 @@ public class DoFnInvokersTest { assertEquals("foo", output); } }); - assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider)); + invoker.invokeProcessElement(mockArgumentProvider); assertThat( invoker.invokeNewTracker(new RestrictionWithDefaultTracker()), instanceOf(DefaultTracker.class)); @@ -561,14 +531,14 @@ public class DoFnInvokersTest { @Test public void testLocalPrivateDoFnClass() throws Exception { PrivateDoFnClass fn = mock(PrivateDoFnClass.class); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); verify(fn).processThis(mockProcessContext); } @Test public void testStaticPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass()); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext); } @@ -576,28 +546,28 @@ public class DoFnInvokersTest { public void testInnerPackagePrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass()); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext); } @Test public void testStaticPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass()); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext); } @Test public void testInnerPrivateDoFnClass() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass()); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext); } @Test public void testAnonymousInnerDoFn() throws Exception { DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass()); - assertEquals(stop(), invokeProcessElement(fn)); + invokeProcessElement(fn); DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext); } @@ -634,31 +604,6 @@ public class DoFnInvokersTest { } @Test - public void testProcessElementExceptionWithReturn() throws Exception { - thrown.expect(UserCodeException.class); - thrown.expectMessage("bogus"); - DoFnInvokers.invokerFor( - new DoFn<Integer, Integer>() { - @ProcessElement - public ProcessContinuation processElement( - @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) { - throw new IllegalArgumentException("bogus"); - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer element) { - return null; - } - - @NewTracker - public SomeRestrictionTracker newTracker(SomeRestriction restriction) { - return null; - } - }) - .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>()); - } - - @Test public void testStartBundleException() throws Exception { DoFnInvoker<Integer, Integer> invoker = DoFnInvokers.invokerFor( http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java index 44ae5c4..d321f54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java @@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest { @Test public void testBadReturnType() throws Exception { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Must return void or ProcessContinuation"); + thrown.expectMessage("Must return void"); analyzeProcessElementMethod( new AnonymousMethod() { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java index 08af65e..07b3348 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java @@ -52,8 +52,7 @@ import org.junit.runners.JUnit4; public class DoFnSignaturesSplittableDoFnTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private abstract static class SomeRestriction - implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {} + private static class SomeRestriction {} private abstract static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {} @@ -61,20 +60,6 @@ public class DoFnSignaturesSplittableDoFnTest { private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {} @Test - public void testReturnsProcessContinuation() throws Exception { - DoFnSignature.ProcessElementMethod signature = - analyzeProcessElementMethod( - new AnonymousMethod() { - private DoFn.ProcessContinuation method( - DoFn<Integer, String>.ProcessContext context) { - return null; - } - }); - - assertTrue(signature.hasReturnValue()); - } - - @Test public void testHasRestrictionTracker() throws Exception { DoFnSignature.ProcessElementMethod signature = analyzeProcessElementMethod( @@ -115,6 +100,11 @@ public class DoFnSignaturesSplittableDoFnTest { public SomeRestriction getInitialRestriction(Integer element) { return null; } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return null; + } } @BoundedPerElement @@ -140,55 +130,6 @@ public class DoFnSignaturesSplittableDoFnTest { .isBoundedPerElement()); } - private static class BaseFnWithoutContinuation extends DoFn<Integer, String> { - @ProcessElement - public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer element) { - return null; - } - } - - private static class BaseFnWithContinuation extends DoFn<Integer, String> { - @ProcessElement - public ProcessContinuation processElement( - ProcessContext context, SomeRestrictionTracker tracker) { - return null; - } - - @GetInitialRestriction - public SomeRestriction getInitialRestriction(Integer element) { - return null; - } - } - - @Test - public void testSplittableBoundednessInferredFromReturnValue() throws Exception { - assertEquals( - PCollection.IsBounded.BOUNDED, - DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement()); - assertEquals( - PCollection.IsBounded.UNBOUNDED, - DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement()); - } - - @Test - public void testSplittableRespectsBoundednessAnnotation() throws Exception { - @BoundedPerElement - class BoundedFnWithContinuation extends BaseFnWithContinuation {} - - assertEquals( - PCollection.IsBounded.BOUNDED, - DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement()); - - @UnboundedPerElement - class UnboundedFnWithContinuation extends BaseFnWithContinuation {} - - assertEquals( - PCollection.IsBounded.UNBOUNDED, - DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement()); - } @Test public void testUnsplittableIsBounded() throws Exception { class UnsplittableFn extends DoFn<Integer, String> { @@ -231,10 +172,8 @@ public class DoFnSignaturesSplittableDoFnTest { public void testSplittableWithAllFunctions() throws Exception { class GoodSplittableDoFn extends DoFn<Integer, String> { @ProcessElement - public ProcessContinuation processElement( - ProcessContext context, SomeRestrictionTracker tracker) { - return null; - } + public void processElement( + ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -259,7 +198,6 @@ public class DoFnSignaturesSplittableDoFnTest { DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); - assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); @@ -276,9 +214,7 @@ public class DoFnSignaturesSplittableDoFnTest { public void testSplittableWithAllFunctionsGeneric() throws Exception { class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> { @ProcessElement - public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) { - return null; - } + public void processElement(ProcessContext context, TrackerT tracker) {} @GetInitialRestriction public RestrictionT getInitialRestriction(Integer element) { @@ -306,7 +242,6 @@ public class DoFnSignaturesSplittableDoFnTest { SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass()); assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType()); assertTrue(signature.processElement().isSplittable()); - assertTrue(signature.processElement().hasReturnValue()); assertEquals( SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType()); assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 70c8dfd..cffb0ad 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.fail; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.TimeDomain; @@ -330,19 +329,6 @@ public class DoFnSignaturesTest { } @Test - public void testPipelineOptionsParameter() throws Exception { - DoFnSignature sig = - DoFnSignatures.getSignature(new DoFn<String, String>() { - @ProcessElement - public void process(ProcessContext c, PipelineOptions options) {} - }.getClass()); - - assertThat( - sig.processElement().extraParameters(), - Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class))); - } - - @Test public void testDeclAndUsageOfTimerInSuperclass() throws Exception { DoFnSignature sig = DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java index 8aed6b9..831894c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.io.range.OffsetRange; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java index bfd01f0..b14e221 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -56,12 +55,11 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9)); expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11)); - SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5)); assertEquals( expected, - runWindowFn(windowFn, + runWindowFn( + SlidingWindows.of(new Duration(10)).every(new Duration(5)), Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); - assertThat(windowFn.assignsToOneWindow(), is(false)); } @Test @@ -71,27 +69,11 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5)); expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11)); expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11)); - SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5)); - assertEquals( - expected, - runWindowFn(windowFn, - Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); - assertThat(windowFn.assignsToOneWindow(), is(false)); - } - - @Test - public void testEqualSize() throws Exception { - Map<IntervalWindow, Set<String>> expected = new HashMap<>(); - expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); - expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5)); - expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7)); - SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3)); assertEquals( expected, runWindowFn( - windowFn, - Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L))); - assertThat(windowFn.assignsToOneWindow(), is(true)); + SlidingWindows.of(new Duration(7)).every(new Duration(5)), + Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L))); } @Test @@ -100,14 +82,12 @@ public class SlidingWindowsTest { expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2)); expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11)); expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100)); - SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10)); assertEquals( expected, runWindowFn( // Only look at the first 3 millisecs of every 10-millisec interval. - windowFn, + SlidingWindows.of(new Duration(3)).every(new Duration(10)), Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L))); - assertThat(windowFn.assignsToOneWindow(), is(true)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml index 7d54990..e4e951b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/pom.xml +++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index d7205bf..8d1fe74 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -135,7 +135,7 @@ public class GcsUtil { private static final int MAX_CONCURRENT_BATCHES = 256; private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1)); + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java index fd908cf..e5b48d3 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java @@ -17,9 +17,8 @@ */ package org.apache.beam.sdk.util; -import static com.google.api.client.util.BackOffUtils.next; - -import com.google.api.client.http.HttpIOExceptionHandler; +import com.google.api.client.http.HttpBackOffIOExceptionHandler; +import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpResponse; @@ -61,106 +60,64 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer { */ private static final int HANGING_GET_TIMEOUT_SEC = 80; - /** Handlers used to provide additional logging information on unsuccessful HTTP requests. */ - private static class LoggingHttpBackOffHandler - implements HttpIOExceptionHandler, HttpUnsuccessfulResponseHandler { - - private final Sleeper sleeper; - private final BackOff ioExceptionBackOff; - private final BackOff unsuccessfulResponseBackOff; - private final Set<Integer> ignoredResponseCodes; - private int ioExceptionRetries; - private int unsuccessfulResponseRetries; - - private LoggingHttpBackOffHandler( - Sleeper sleeper, - BackOff ioExceptionBackOff, - BackOff unsucessfulResponseBackOff, - Set<Integer> ignoredResponseCodes) { - this.sleeper = sleeper; - this.ioExceptionBackOff = ioExceptionBackOff; - this.unsuccessfulResponseBackOff = unsucessfulResponseBackOff; - this.ignoredResponseCodes = ignoredResponseCodes; + private static class LoggingHttpBackOffIOExceptionHandler + extends HttpBackOffIOExceptionHandler { + public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) { + super(backOff); } @Override public boolean handleIOException(HttpRequest request, boolean supportsRetry) throws IOException { - // We will retry if the request supports retry or the backoff was successful. - // Note that the order of these checks is important since - // backOffWasSuccessful will perform a sleep. - boolean willRetry = supportsRetry && backOffWasSuccessful(ioExceptionBackOff); + boolean willRetry = super.handleIOException(request, supportsRetry); if (willRetry) { - ioExceptionRetries += 1; LOG.debug("Request failed with IOException, will retry: {}", request.getUrl()); } else { - String message = "Request failed with IOException, " - + "performed {} retries due to IOExceptions, " - + "performed {} retries due to unsuccessful status codes, " - + "HTTP framework says request {} be retried, " - + "(caller responsible for retrying): {}"; - LOG.warn(message, - ioExceptionRetries, - unsuccessfulResponseRetries, - supportsRetry ? "can" : "cannot", + LOG.warn( + "Request failed with IOException (caller responsible for retrying): {}", request.getUrl()); } return willRetry; } + } + + private static class LoggingHttpBackoffUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + private final HttpBackOffUnsuccessfulResponseHandler handler; + private final Set<Integer> ignoredResponseCodes; + + public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff, + Sleeper sleeper, Set<Integer> ignoredResponseCodes) { + this.ignoredResponseCodes = ignoredResponseCodes; + handler = new HttpBackOffUnsuccessfulResponseHandler(backoff); + handler.setSleeper(sleeper); + handler.setBackOffRequired( + new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() { + @Override + public boolean isRequired(HttpResponse response) { + int statusCode = response.getStatusCode(); + return (statusCode / 100 == 5) || // 5xx: server error + statusCode == 429; // 429: Too many requests + } + }); + } @Override - public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) - throws IOException { - // We will retry if the request supports retry and the status code requires a backoff - // and the backoff was successful. Note that the order of these checks is important since - // backOffWasSuccessful will perform a sleep. - boolean willRetry = supportsRetry - && retryOnStatusCode(response.getStatusCode()) - && backOffWasSuccessful(unsuccessfulResponseBackOff); - if (willRetry) { - unsuccessfulResponseRetries += 1; + public boolean handleResponse(HttpRequest request, HttpResponse response, + boolean supportsRetry) throws IOException { + boolean retry = handler.handleResponse(request, response, supportsRetry); + if (retry) { LOG.debug("Request failed with code {}, will retry: {}", response.getStatusCode(), request.getUrl()); - } else { - String message = "Request failed with code {}, " - + "performed {} retries due to IOExceptions, " - + "performed {} retries due to unsuccessful status codes, " - + "HTTP framework says request {} be retried, " - + "(caller responsible for retrying): {}"; - if (ignoredResponseCodes.contains(response.getStatusCode())) { - // Log ignored response codes at a lower level - LOG.debug(message, - response.getStatusCode(), - ioExceptionRetries, - unsuccessfulResponseRetries, - supportsRetry ? "can" : "cannot", - request.getUrl()); - } else { - LOG.warn(message, - response.getStatusCode(), - ioExceptionRetries, - unsuccessfulResponseRetries, - supportsRetry ? "can" : "cannot", - request.getUrl()); - } - } - return willRetry; - } - /** Returns true iff performing the backoff was successful. */ - private boolean backOffWasSuccessful(BackOff backOff) { - try { - return next(sleeper, backOff); - } catch (InterruptedException | IOException e) { - return false; + } else if (!ignoredResponseCodes.contains(response.getStatusCode())) { + LOG.warn( + "Request failed with code {} (caller responsible for retrying): {}", + response.getStatusCode(), + request.getUrl()); } - } - /** Returns true iff the {@code statusCode} represents an error that should be retried. */ - private boolean retryOnStatusCode(int statusCode) { - return (statusCode == 0) // Code 0 usually means no response / network error - || (statusCode / 100 == 5) // 5xx: server error - || statusCode == 429; // 429: Too many requests + return retry; } } @@ -216,20 +173,20 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer { // TODO: Do this exclusively for work requests. request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000); - LoggingHttpBackOffHandler loggingHttpBackOffHandler = new LoggingHttpBackOffHandler( - sleeper, - // Retry immediately on IOExceptions. - BackOff.ZERO_BACKOFF, - // Back off on retryable http errors. + // Back off on retryable http errors. + request.setUnsuccessfulResponseHandler( // A back-off multiplier of 2 raises the maximum request retrying time // to approximately 5 minutes (keeping other back-off parameters to // their default values). - new ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(), - ignoredResponseCodes - ); - - request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler); - request.setIOExceptionHandler(loggingHttpBackOffHandler); + new LoggingHttpBackoffUnsuccessfulResponseHandler( + new ExponentialBackOff.Builder().setNanoClock(nanoClock) + .setMultiplier(2).build(), + sleeper, ignoredResponseCodes)); + + // Retry immediately on IOExceptions. + LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler = + new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF); + request.setIOExceptionHandler(loggingBackoffHandler); // Set response initializer if (responseInterceptor != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index a0d9e4b..625c248 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -15,16 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.gcp; +package org.apache.beam; -import static org.apache.beam.sdk.util.ApiSurface.classesInPackage; -import static org.apache.beam.sdk.util.ApiSurface.containsOnlyClassesMatching; +import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; import static org.hamcrest.MatcherAssert.assertThat; import com.google.common.collect.ImmutableSet; import java.util.Set; import org.apache.beam.sdk.util.ApiSurface; -import org.hamcrest.Matcher; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -34,32 +32,28 @@ import org.junit.runners.JUnit4; public class GcpCoreApiSurfaceTest { @Test - public void testGcpCoreApiSurface() throws Exception { - final Package thisPackage = getClass().getPackage(); - final ClassLoader thisClassLoader = getClass().getClassLoader(); - final ApiSurface apiSurface = - ApiSurface.ofPackage(thisPackage, thisClassLoader) - .pruningPattern("org[.]apache[.]beam[.].*Test.*") - .pruningPattern("org[.]apache[.]beam[.].*IT") - .pruningPattern("java[.]lang.*") - .pruningPattern("java[.]util.*"); + public void testApiSurface() throws Exception { @SuppressWarnings("unchecked") - final Set<Matcher<Class<?>>> allowedClasses = + final Set<String> allowed = ImmutableSet.of( - classesInPackage("com.google.api.client.googleapis"), - classesInPackage("com.google.api.client.http"), - classesInPackage("com.google.api.client.json"), - classesInPackage("com.google.api.client.util"), - classesInPackage("com.google.api.services.storage"), - classesInPackage("com.google.auth"), - classesInPackage("com.fasterxml.jackson.annotation"), - classesInPackage("java"), - classesInPackage("javax"), - classesInPackage("org.apache.beam.sdk"), - classesInPackage("org.joda.time") - ); + "org.apache.beam", + "com.google.api.client", + "com.google.api.services.storage", + "com.google.auth", + "com.fasterxml.jackson.annotation", + "com.fasterxml.jackson.core", + "com.fasterxml.jackson.databind", + "org.apache.avro", + "org.hamcrest", + // via DataflowMatchers + "org.codehaus.jackson", + // via Avro + "org.joda.time", + "org.junit", + "sun.reflect"); - assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses)); + assertThat( + ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java index 13a9309..37551a4 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java @@ -49,11 +49,10 @@ import java.net.SocketTimeoutException; import java.security.PrivateKey; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.sdk.testing.ExpectedLogs; import org.hamcrest.Matchers; import org.junit.After; +import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -68,8 +67,6 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class RetryHttpRequestInitializerTest { - @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(RetryHttpRequestInitializer.class); - @Mock private PrivateKey mockPrivateKey; @Mock private LowLevelHttpRequest mockLowLevelRequest; @Mock private LowLevelHttpResponse mockLowLevelResponse; @@ -138,7 +135,6 @@ public class RetryHttpRequestInitializerTest { verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); verify(mockLowLevelRequest).execute(); verify(mockLowLevelResponse).getStatusCode(); - expectedLogs.verifyNotLogged("Request failed"); } /** @@ -157,7 +153,7 @@ public class RetryHttpRequestInitializerTest { HttpResponse response = result.executeUnparsed(); assertNotNull(response); } catch (HttpResponseException e) { - assertThat(e.getMessage(), Matchers.containsString("403")); + Assert.assertThat(e.getMessage(), Matchers.containsString("403")); } verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); @@ -166,7 +162,6 @@ public class RetryHttpRequestInitializerTest { verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); verify(mockLowLevelRequest).execute(); verify(mockLowLevelResponse).getStatusCode(); - expectedLogs.verifyWarn("Request failed with code 403"); } /** @@ -193,7 +188,6 @@ public class RetryHttpRequestInitializerTest { verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt()); verify(mockLowLevelRequest, times(3)).execute(); verify(mockLowLevelResponse, times(3)).getStatusCode(); - expectedLogs.verifyDebug("Request failed with code 503"); } /** @@ -217,7 +211,6 @@ public class RetryHttpRequestInitializerTest { verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt()); verify(mockLowLevelRequest, times(2)).execute(); verify(mockLowLevelResponse).getStatusCode(); - expectedLogs.verifyDebug("Request failed with IOException"); } /** @@ -231,22 +224,19 @@ public class RetryHttpRequestInitializerTest { int n = 0; @Override public Integer answer(InvocationOnMock invocation) { - return n++ < retries ? 503 : 9999; + return (n++ < retries - 1) ? 503 : 200; }}); Storage.Buckets.Get result = storage.buckets().get("test"); - try { - result.executeUnparsed(); - fail(); - } catch (Throwable t) { - } + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), anyString()); - verify(mockLowLevelRequest, times(retries + 1)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(retries + 1)).execute(); - verify(mockLowLevelResponse, times(retries + 1)).getStatusCode(); - expectedLogs.verifyWarn("performed 10 retries due to unsuccessful status codes"); + verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), + anyString()); + verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(retries)).execute(); + verify(mockLowLevelResponse, times(retries)).getStatusCode(); } /** @@ -286,7 +276,6 @@ public class RetryHttpRequestInitializerTest { } catch (Throwable e) { assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class)); assertEquals(1 + defaultNumberOfRetries, executeCount.get()); - expectedLogs.verifyWarn("performed 10 retries due to IOExceptions"); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/jackson/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml index 7fd38e0..4b09c11 100644 --- a/sdks/java/extensions/jackson/pom.xml +++ b/sdks/java/extensions/jackson/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/join-library/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml index ea24b75..556ec40 100644 --- a/sdks/java/extensions/join-library/pom.xml +++ b/sdks/java/extensions/join-library/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index 1222476..3d63626 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/protobuf/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml index 63855f8..ae909ab 100644 --- a/sdks/java/extensions/protobuf/pom.xml +++ b/sdks/java/extensions/protobuf/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/sorter/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml index 395c73f..9d25f9d 100644 --- a/sdks/java/extensions/sorter/pom.xml +++ b/sdks/java/extensions/sorter/pom.xml @@ -22,13 +22,17 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>beam-sdks-java-extensions-sorter</artifactId> <name>Apache Beam :: SDKs :: Java :: Extensions :: Sorter</name> + <properties> + <hadoop.version>2.7.1</hadoop.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.beam</groupId> @@ -38,12 +42,14 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml index fe5c2f1..61a170a 100644 --- a/sdks/java/harness/pom.xml +++ b/sdks/java/harness/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -83,11 +83,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-construction-java</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> </dependency> @@ -155,21 +150,10 @@ </dependency> <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> - <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - <!-- test dependencies --> <dependency> <groupId>org.hamcrest</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 2a9cef8..e33277a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -18,111 +18,82 @@ package org.apache.beam.fn.harness.control; -import com.google.common.annotations.VisibleForTesting; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.Iterables.getOnlyElement; + +import com.google.common.collect.Collections2; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.ServiceLoader; -import java.util.Set; +import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fake.FakeStepContext; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.core.PTransformRunnerFactory; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; +import org.apache.beam.runners.core.BeamFnDataReadRunner; +import org.apache.beam.runners.core.BeamFnDataWriteRunner; +import org.apache.beam.runners.core.BoundedSourceRunner; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing - * the set of required runners for each {@link RunnerApi.FunctionSpec}, + * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec}, * wiring them together based upon the {@code input} and {@code output} map definitions. * * <p>Finally executes the DAG based graph by starting all runners in reverse topological order, * and finishing all runners in forward topological order. */ public class ProcessBundleHandler { - // TODO: What should the initial set of URNs be? private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1"; - public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1"; + private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1"; + private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1"; + private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1"; private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class); - private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES; - - static { - Set<Registrar> pipelineRunnerRegistrars = - Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); - pipelineRunnerRegistrars.addAll( - Lists.newArrayList(ServiceLoader.load(Registrar.class, - ReflectHelpers.findClassLoader()))); - - // Load all registered PTransform runner factories. - ImmutableMap.Builder<String, PTransformRunnerFactory> builder = - ImmutableMap.builder(); - for (Registrar registrar : pipelineRunnerRegistrars) { - builder.putAll(registrar.getPTransformRunnerFactories()); - } - REGISTERED_RUNNER_FACTORIES = builder.build(); - } private final PipelineOptions options; private final Function<String, Message> fnApiRegistry; private final BeamFnDataClient beamFnDataClient; - private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap; - private final PTransformRunnerFactory defaultPTransformRunnerFactory; - public ProcessBundleHandler( PipelineOptions options, Function<String, Message> fnApiRegistry, BeamFnDataClient beamFnDataClient) { - this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES); - } - - @VisibleForTesting - ProcessBundleHandler( - PipelineOptions options, - Function<String, Message> fnApiRegistry, - BeamFnDataClient beamFnDataClient, - Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) { this.options = options; this.fnApiRegistry = fnApiRegistry; this.beamFnDataClient = beamFnDataClient; - this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap; - this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() { - @Override - public Object createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - String pTransformId, - RunnerApi.PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, RunnerApi.PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) { - throw new IllegalStateException(String.format( - "No factory registered for %s, known factories %s", - pTransform.getSpec().getUrn(), - urnToPTransformRunnerFactoryMap.keySet())); - } - }; } private void createRunnerAndConsumersForPTransformRecursively( @@ -157,19 +128,115 @@ public class ProcessBundleHandler { } } - urnToPTransformRunnerFactoryMap.getOrDefault( - pTransform.getSpec().getUrn(), defaultPTransformRunnerFactory) - .createRunnerForPTransform( - options, - beamFnDataClient, - pTransformId, - pTransform, - processBundleInstructionId, - processBundleDescriptor.getPcollectionsMap(), - processBundleDescriptor.getCodersMap(), - pCollectionIdsToConsumers, - addStartFunction, - addFinishFunction); + createRunnerForPTransform( + pTransformId, + pTransform, + processBundleInstructionId, + processBundleDescriptor.getPcollectionsMap(), + pCollectionIdsToConsumers, + addStartFunction, + addFinishFunction); + } + + protected void createRunnerForPTransform( + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) throws IOException { + + + // For every output PCollection, create a map from output name to Consumer + ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>> + outputMapBuilder = ImmutableMap.builder(); + for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { + outputMapBuilder.put( + entry.getKey(), + pCollectionIdsToConsumers.get(entry.getValue())); + } + ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap = + outputMapBuilder.build(); + + + // Based upon the function spec, populate the start/finish/consumer information. + RunnerApi.FunctionSpec functionSpec = pTransform.getSpec(); + ThrowingConsumer<WindowedValue<?>> consumer; + switch (functionSpec.getUrn()) { + default: + BeamFnApi.Target target; + RunnerApi.Coder coderSpec; + throw new IllegalArgumentException( + String.format("Unknown FunctionSpec %s", functionSpec)); + + case DATA_OUTPUT_URN: + target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(pTransformId) + .setName(getOnlyElement(pTransform.getInputsMap().keySet())) + .build(); + coderSpec = (RunnerApi.Coder) fnApiRegistry.apply( + pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId()); + BeamFnDataWriteRunner<Object> remoteGrpcWriteRunner = + new BeamFnDataWriteRunner<Object>( + functionSpec, + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient); + addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput); + consumer = (ThrowingConsumer) + (ThrowingConsumer<WindowedValue<Object>>) remoteGrpcWriteRunner::consume; + addFinishFunction.accept(remoteGrpcWriteRunner::close); + break; + + case DATA_INPUT_URN: + target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(pTransformId) + .setName(getOnlyElement(pTransform.getOutputsMap().keySet())) + .build(); + coderSpec = (RunnerApi.Coder) fnApiRegistry.apply( + pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); + BeamFnDataReadRunner<?> remoteGrpcReadRunner = + new BeamFnDataReadRunner<Object>( + functionSpec, + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient, + (Map) outputMap); + addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation); + consumer = null; + addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes); + break; + + case JAVA_DO_FN_URN: + DoFnRunner<Object, Object> doFnRunner = createDoFnRunner(functionSpec, (Map) outputMap); + addStartFunction.accept(doFnRunner::startBundle); + consumer = (ThrowingConsumer) + (ThrowingConsumer<WindowedValue<Object>>) doFnRunner::processElement; + addFinishFunction.accept(doFnRunner::finishBundle); + break; + + case JAVA_SOURCE_URN: + @SuppressWarnings({"unchecked", "rawtypes"}) + BoundedSourceRunner<BoundedSource<Object>, Object> sourceRunner = + createBoundedSourceRunner(functionSpec, (Map) outputMap); + // TODO: Remove and replace with source being sent across gRPC port + addStartFunction.accept(sourceRunner::start); + consumer = (ThrowingConsumer) + (ThrowingConsumer<WindowedValue<BoundedSource<Object>>>) + sourceRunner::runReadLoop; + break; + } + + // If we created a consumer, add it to the map containing PCollection ids to consumers + if (consumer != null) { + for (String inputPCollectionId : + pTransform.getInputsMap().values()) { + pCollectionIdsToConsumers.put(inputPCollectionId, consumer); + } + } } public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request) @@ -232,4 +299,88 @@ public class ProcessBundleHandler { return response; } + + /** + * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}. + */ + private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner( + RunnerApi.FunctionSpec functionSpec, + Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) { + ByteString serializedFn; + try { + serializedFn = functionSpec.getParameter().unpack(BytesValue.class).getValue(); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException( + String.format("Unable to unwrap DoFn %s", functionSpec), e); + } + DoFnInfo<?, ?> doFnInfo = + (DoFnInfo<?, ?>) + SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo"); + + checkArgument( + Objects.equals( + new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)), + doFnInfo.getOutputMap().keySet()), + "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.", + outputMap.keySet(), + doFnInfo.getOutputMap()); + + ImmutableMultimap.Builder<TupleTag<?>, + ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput = + ImmutableMultimap.builder(); + for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) { + tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey()))); + } + @SuppressWarnings({"unchecked", "rawtypes"}) + final Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap = + (Map) tagToOutput.build().asMap(); + + OutputManager outputManager = + new OutputManager() { + Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput = + tagBasedOutputMap; + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + try { + Collection<ThrowingConsumer<WindowedValue<?>>> consumers = + tupleTagToOutput.get(tag); + if (consumers == null) { + /* This is a normal case, e.g., if a DoFn has output but that output is not + * consumed. Drop the output. */ + return; + } + for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) { + consumer.accept(output); + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }; + + @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) + DoFnRunner<InputT, OutputT> runner = + DoFnRunners.simpleRunner( + PipelineOptionsFactory.create(), /* TODO */ + (DoFn) doFnInfo.getDoFn(), + NullSideInputReader.empty(), /* TODO */ + outputManager, + (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()), + new ArrayList<>(doFnInfo.getOutputMap().values()), + new FakeStepContext(), + (WindowingStrategy) doFnInfo.getWindowingStrategy()); + return runner; + } + + private <InputT extends BoundedSource<OutputT>, OutputT> + BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner( + RunnerApi.FunctionSpec functionSpec, + Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) { + + @SuppressWarnings({"rawtypes", "unchecked"}) + BoundedSourceRunner<InputT, OutputT> runner = + new BoundedSourceRunner(options, functionSpec, outputMap); + return runner; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java index 0e738ac..276a120 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java @@ -79,7 +79,7 @@ public class RegisterHandler { processBundleDescriptor.getClass()); computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor); for (Map.Entry<String, RunnerApi.Coder> entry - : processBundleDescriptor.getCodersMap().entrySet()) { + : processBundleDescriptor.getCodersyyyMap().entrySet()) { LOG.debug("Registering {} with type {}", entry.getKey(), entry.getValue().getClass());
