[BEAM-1915] Removes use of OldDoFn from Apex

This is the last occurrence of OldDoFn in the Beam repository
outside of OldDoFn itself.

It's also used in the Dataflow worker, but it can be
deleted entirely once we (Dataflow team) take care of that.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e243881
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e243881
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e243881

Branch: refs/heads/jstorm-runner
Commit: 3e243881fe767cf30869abf5c745c26f96d66fc4
Parents: fdbadfc
Author: Eugene Kirpichov <[email protected]>
Authored: Mon Apr 10 22:51:16 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Apr 14 23:34:11 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 225 ++++++-------------
 1 file changed, 63 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e243881/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1697921..7d17ac6 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -42,32 +42,29 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import 
org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import 
org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.WindowingInternals;
+import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -98,8 +95,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator 
{
   private final StateInternalsFactory<K> stateInternalsFactory;
   private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
 
-  private transient ProcessContext context;
-  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
   private transient ApexTimerInternals timerInternals = new 
ApexTimerInternals();
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
@@ -161,16 +156,53 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
   @Override
   public void setup(OperatorContext context) {
     this.traceTuples = 
ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
-    StateInternalsFactory<K> stateInternalsFactory = new 
GroupByKeyStateInternalsFactory();
-    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
-        stateInternalsFactory, SystemReduceFn.<K, V, 
BoundedWindow>buffering(this.valueCoder));
-    this.context = new ProcessContext(fn, this.timerInternals);
   }
 
   @Override
   public void teardown() {
   }
 
+
+  private ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner(K 
key) {
+    return new ReduceFnRunner<>(
+        key,
+        windowingStrategy,
+        ExecutableTriggerStateMachine.create(
+            TriggerStateMachines.stateMachineForTrigger(
+                Triggers.toProto(windowingStrategy.getTrigger()))),
+        stateInternalsFactory.stateInternalsForKey(key),
+        timerInternals,
+        new OutputWindowedValue<KV<K, Iterable<V>>>() {
+          @Override
+          public void outputWindowedValue(
+              KV<K, Iterable<V>> output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            if (traceTuples) {
+              LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+            }
+            ApexGroupByKeyOperator.this.output.emit(
+                ApexStreamTuple.DataTuple.of(WindowedValue.of(output, 
timestamp, windows, pane)));
+          }
+
+          @Override
+          public <AdditionalOutputT> void outputWindowedValue(
+              TupleTag<AdditionalOutputT> tag,
+              AdditionalOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            throw new UnsupportedOperationException(
+                "GroupAlsoByWindow should not use side outputs");
+          }
+        },
+        NullSideInputReader.empty(),
+        null,
+        SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder),
+        serializedOptions.get());
+  }
+
   /**
    * Returns the list of timers that are ready to fire. These are the timers
    * that are registered to be triggered at a time before the current 
watermark.
@@ -212,13 +244,11 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
         windowedValue.getTimestamp(),
         windowedValue.getWindows(),
         windowedValue.getPane());
-
-    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
-            kv.getKey(),
-            Collections.singletonList(updatedWindowedValue));
-
-    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-    fn.processElement(context);
+    timerInternals.setKey(kv.getKey());
+    ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
+        newReduceFnRunner(kv.getKey());
+    
reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
+    reduceFnRunner.persist();
   }
 
   private StateInternals<K> getStateInternalsForKey(K key) {
@@ -265,158 +295,29 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
     if (!timers.isEmpty()) {
       for (Slice keyBytes : timers.keySet()) {
         K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
-        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, 
timers.get(keyBytes));
-        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-        fn.processElement(context);
+        timerInternals.setKey(key);
+        ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = 
newReduceFnRunner(key);
+        reduceFnRunner.onTimers(timers.get(keyBytes));
+        reduceFnRunner.persist();
       }
     }
   }
 
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, 
Iterable<V>, ?,
-      KeyedWorkItem<K, V>>.ProcessContext {
-
-    private final ApexTimerInternals timerInternals;
-    private StateInternals<K> stateInternals;
-    private KeyedWorkItem<K, V> element;
-
-    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> 
function,
-                          ApexTimerInternals timerInternals) {
-      function.super();
-      this.timerInternals = checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> 
stateForKey) {
-      this.element = element;
-      this.stateInternals = stateForKey;
-    }
-
-    @Override
-    public KeyedWorkItem<K, V> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException(
-          "timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.get();
-    }
-
-    @Override
-    public void output(KV<K, Iterable<V>> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant 
timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing 
KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException(
-          "pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> 
windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() 
{
-
-        @Override
-        public StateInternals<K> stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(
-            KV<K, Iterable<V>> output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          if (traceTuples) {
-            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
-          }
-          ApexGroupByKeyOperator.this.output.emit(
-              ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, 
windows, pane)));
-        }
-
-        @Override
-        public <AdditionalOutputT> void outputWindowedValue(
-            TupleTag<AdditionalOutputT> tag,
-            AdditionalOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "GroupAlsoByWindow should not use tagged outputs");
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available 
in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in 
Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in 
Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming 
mode.");
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      throw new RuntimeException("output() is not available when grouping by 
window.");
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      output(tag, output);
-    }
-
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   /**
    * An implementation of Beam's {@link TimerInternals}.
    *
    */
   private class ApexTimerInternals implements TimerInternals {
+    private K key;
+
+    public void setKey(K key) {
+      this.key = key;
+    }
 
     @Deprecated
     @Override
     public void setTimer(TimerData timerData) {
-      registerActiveTimer(context.element().key(), timerData);
+      registerActiveTimer(key, timerData);
     }
 
     @Override
@@ -427,7 +328,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator {
     @Deprecated
     @Override
     public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
+      unregisterActiveTimer(key, timerKey);
     }
 
     @Override

Reply via email to