This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit c9474558092611116f308d7b824ee0bb5c11ecb7 Author: Etienne Chauchot <[email protected]> AuthorDate: Tue May 28 09:31:28 2019 +0200 re-enable reduceFnRunner timers for output --- .../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java index 2fb08f5..cc65716 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.TriggerTranslation; @@ -100,21 +101,18 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind // Finish any pending windows by advancing the input watermark to infinity. timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - // not supported yet -/* // Finally, advance the processing time to infinity to fire any timers. timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); fireEligibleTimers(timerInternals, reduceFnRunner); -*/ reduceFnRunner.persist(); return outputter.getOutputs().iterator(); } -/* private void fireEligibleTimers( + private void fireEligibleTimers( InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception { @@ -136,7 +134,7 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind reduceFnRunner.onTimers(timers); timers.clear(); } - }*/ + } private static class GABWOutputWindowedValue<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
