Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess This should become either a DoFn or probably more appropriately, just something internal to runners that actually require it to be manifested as a DoFn at all.
As an intermediate migration step, this lessens the level to which it depends on unsupported APIs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2350417 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2350417 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2350417 Branch: refs/heads/master Commit: b2350417f73ae6c34f849ff0e93d5bd93df3088d Parents: 164ee56 Author: Kenneth Knowles <[email protected]> Authored: Sun Oct 23 19:45:41 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun Oct 23 19:52:51 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2350417/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java index 8f3f540..6da4da0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.values.KV; /** @@ -28,20 +29,13 @@ import org.apache.beam.sdk.values.KV; * @param <V> the type of the values of the input {@code PCollection} */ @SystemDoFnInternal -public class ReifyTimestampAndWindowsDoFn<K, V> - extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> { +public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> + implements RequiresWindowAccess { @Override - public void processElement(ProcessContext c) - throws Exception { + public void processElement(ProcessContext c) throws Exception { KV<K, V> kv = c.element(); K key = kv.getKey(); V value = kv.getValue(); - c.output(KV.of( - key, - WindowedValue.of( - value, - c.timestamp(), - c.windowingInternals().windows(), - c.pane()))); + c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane()))); } }
