Repository: incubator-beam Updated Branches: refs/heads/master 8dfadbf01 -> c390a2a7f
Add adapter from OldDoFn.RequiresWindowAccess to DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/164ee56b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/164ee56b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/164ee56b Branch: refs/heads/master Commit: 164ee56b41e01c0ee637eff24e23a814b5885e6f Parents: a9a41eb Author: Kenneth Knowles <[email protected]> Authored: Sun Oct 23 19:45:18 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:50 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/OldDoFn.java | 89 +++++++++++++++++--- 1 file changed, 76 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/164ee56b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index e3cfc38..72c2965 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -77,19 +77,12 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl public DoFn<InputT, OutputT> toDoFn() { if (this instanceof RequiresWindowAccess) { - throw new UnsupportedOperationException( - String.format( - "Cannot convert %s to %s because it implements %s." - + " Please convert your %s to a %s directly.", - getClass(), - DoFn.class.getSimpleName(), - RequiresWindowAccess.class.getSimpleName(), - OldDoFn.class.getSimpleName(), - DoFn.class.getSimpleName())); - } - - // No parameters as it just accesses `this` - return new AdaptedDoFn(); + // No parameters as it just accesses `this` + return new AdaptedRequiresWindowAccessDoFn(); + } else { + // No parameters as it just accesses `this` + return new AdaptedDoFn(); + } } /** @@ -770,4 +763,74 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl return OldDoFn.this.getOutputTypeDescriptor(); } } + + /** + * A {@link ProcessContext} for an {@link OldDoFn} that implements + * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}. + */ + private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { + + private final BoundedWindow window; + + public AdaptedRequiresWindowAccessProcessContext( + DoFn<InputT, OutputT>.ProcessContext newContext, + BoundedWindow window) { + super(newContext); + this.window = window; + } + + @Override + public BoundedWindow window() { + return window; + } + } + + private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> { + + @Setup + public void setup() throws Exception { + OldDoFn.this.setup(); + } + + @StartBundle + public void startBundle(Context c) throws Exception { + OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + OldDoFn.this.processElement( + OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window)); + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c)); + } + + @Teardown + public void teardown() throws Exception { + OldDoFn.this.teardown(); + } + + @Override + public Duration getAllowedTimestampSkew() { + return OldDoFn.this.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + OldDoFn.this.populateDisplayData(builder); + } + + @Override + protected TypeDescriptor<InputT> getInputTypeDescriptor() { + return OldDoFn.this.getInputTypeDescriptor(); + } + + @Override + protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { + return OldDoFn.this.getOutputTypeDescriptor(); + } + } }
