Repository: incubator-beam Updated Branches: refs/heads/master 6807480a9 -> 772959447
Access to OnTimerContext via DoFnInvokers.ArgumentProvider Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2883062e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2883062e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2883062e Branch: refs/heads/master Commit: 2883062eebe8dba849ab89627f6aeb53266ac1a8 Parents: 42b506f Author: Kenneth Knowles <[email protected]> Authored: Tue Dec 6 20:10:21 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Wed Dec 7 19:22:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 13 +++++++++++++ .../org/apache/beam/runners/core/SplittableParDo.java | 5 +++++ .../org/apache/beam/sdk/transforms/DoFnAdapters.java | 12 ++++++++++++ .../org/apache/beam/sdk/transforms/DoFnTester.java | 7 +++++++ .../beam/sdk/transforms/reflect/DoFnInvoker.java | 8 ++++++++ 5 files changed, 45 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 68751f0..0d41a8d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.InputProvider; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -403,6 +404,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override public InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException("InputProvider is for testing only."); } @@ -589,6 +596,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override public InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException("InputProvider parameters are not supported."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 78f373b..580e842 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -663,6 +663,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override + public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override public DoFn.InputProvider<InputT> inputProvider() { // DoFnSignatures should have verified that this DoFn doesn't access extra context. throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 6ee42e7..e15b08b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -345,6 +346,12 @@ public class DoFnAdapters { } @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Timers are not supported for OldDoFn"); + } + + @Override public WindowingInternals<InputT, OutputT> windowingInternals() { // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this // should be unreachable. @@ -460,6 +467,11 @@ public class DoFnAdapters { } @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); + } + + @Override public WindowingInternals<InputT, OutputT> windowingInternals() { return context.windowingInternals(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 43896c5..93b3f59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -316,6 +317,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "DoFnTester doesn't support timers yet."); + } + + @Override public DoFn.InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException( "Not expected to access InputProvider from DoFnTester"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2883062e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 5e61bdd..97ac9d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -112,6 +112,9 @@ public interface DoFnInvoker<InputT, OutputT> { /** Provide a {@link DoFn.ProcessContext} to use with the given {@link DoFn}. */ DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn); + /** Provide a {@link DoFn.OnTimerContext} to use with the given {@link DoFn}. */ + DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn); + /** A placeholder for testing purposes. */ InputProvider<InputT> inputProvider(); @@ -162,6 +165,11 @@ public interface DoFnInvoker<InputT, OutputT> { } @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + return null; + } + + @Override public InputProvider<InputT> inputProvider() { return null; }
