[ 
https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-10342:
-----------------------------------
    Description: 
When using state time domain to batch items for an external api

We are trying to use the existing GroupIntoBatches. but we need some additional 
functionality to create batch every n milliseconds. So we are trying to add 
some functionality to this Transform by adding an additional Timer of 
TimeDomain PROCESSING_TIME

{code:java}
@TimerId(STALE_TIMER_ID)
 private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
{code}

When we are testing this Timer in Dataflow Runner, this is not outputting any 
new batches

Updated GroupIntoBatches code:
{code:java}
package my.package;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class GroupIntoBatches<K, InputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
Iterable<InputT>>>> {

    private final long batchSize;
    private final long staleTime;

    public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, 
long staleTime) {
        return new GroupIntoBatches<>(batchSize, staleTime);
    }

    @Override
    public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, 
InputT>> input) {
        Duration allowedLateness = 
input.getWindowingStrategy().getAllowedLateness();

        checkArgument(
            input.getCoder() instanceof KvCoder,
            "coder specified in the input PCollection is not a KvCoder");
        KvCoder inputCoder = (KvCoder) input.getCoder();
        Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
        Coder<InputT> valueCoder = (Coder<InputT>) 
inputCoder.getCoderArguments().get(1);

        return input.apply(
            ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, 
allowedLateness, keyCoder, valueCoder)));
    }

    @VisibleForTesting
    static class GroupIntoBatchesDoFn<K, InputT>
        extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {

        private static final String STALE_TIMER_ID = "staleTimer";
        private static final String END_OF_WINDOW_ID = "endOFWindow";
        private static final String BATCH_ID = "batch";
        private static final String NUM_ELEMENTS_IN_BATCH_ID = 
"numElementsInBatch";
        private static final String KEY_ID = "key";
        private final long batchSize;
        private final long staleTime;
        private final Duration allowedLateness;

        @TimerId(END_OF_WINDOW_ID)
        private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @TimerId(STALE_TIMER_ID)
        private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId(BATCH_ID)
        private final StateSpec<BagState<InputT>> batchSpec;

        @StateId(NUM_ELEMENTS_IN_BATCH_ID)
        private final StateSpec<CombiningState<Long, long[], Long>> 
numElementsInBatchSpec;

        @StateId(KEY_ID)
        private final StateSpec<ValueState<K>> keySpec;

        private final long prefetchFrequency;

        GroupIntoBatchesDoFn(
            long batchSize,
            long staleTime,
            Duration allowedLateness,
            Coder<K> inputKeyCoder,
            Coder<InputT> inputValueCoder) {
            this.batchSize = batchSize;
            this.staleTime = staleTime;
            this.allowedLateness = allowedLateness;
            this.batchSpec = StateSpecs.bag(inputValueCoder);
            this.numElementsInBatchSpec =
                StateSpecs.combining(
                    new Combine.BinaryCombineLongFn() {

                        @Override
                        public long identity() {
                            return 0L;
                        }

                        @Override
                        public long apply(long left, long right) {
                            return left + right;
                        }
                    });

            this.keySpec = StateSpecs.value(inputKeyCoder);
            // prefetch every 20% of batchSize elements. Do not prefetch if 
batchSize is too little
            this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : 
(batchSize / 5);
        }

        @ProcessElement
        public void processElement(
            @TimerId(END_OF_WINDOW_ID) Timer timer,
            @TimerId(STALE_TIMER_ID) Timer staleTimer,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            @StateId(KEY_ID) ValueState<K> key,
            @Element KV<K, InputT> element,
            BoundedWindow window,
            OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
            Instant windowExpires = window.maxTimestamp().plus(allowedLateness);

            log.debug(
                "*** SET TIMER *** to point in time {} for window {}",
                windowExpires.toString(),
                window.toString());
            timer.set(windowExpires);
            key.write(element.getKey());
            batch.add(element.getValue());
            log.debug("*** BATCH *** Add element for window {} ", 
window.toString());

            // blind add is supported with combiningState
            numElementsInBatch.add(1L);
            Long num = numElementsInBatch.read();

            // set a stale time on Processing Timer to emit batch every n millis
            if (num == 1) {
                staleTimer.offset(Duration.millis(staleTime)).setRelative();
            }
            if (num % prefetchFrequency == 0) {
                // prefetch data and modify batch state (readLater() modifies 
this)
                batch.readLater();
            }
            if (num >= batchSize) {
                log.debug("*** END OF BATCH *** for window {}", 
window.toString());
                flushBatch(receiver, key, batch, numElementsInBatch);
            }
        }

        @OnTimer(STALE_TIMER_ID)
        public void onStaleCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** Stale Timer *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        @OnTimer(END_OF_WINDOW_ID)
        public void onTimerCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** END OF WINDOW *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        private void flushBatch(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            ValueState<K> key,
            BagState<InputT> batch,
            CombiningState<Long, long[], Long> numElementsInBatch) {
            Iterable<InputT> values = batch.read();
            // when the timer fires, batch state might be empty
            if (!Iterables.isEmpty(values)) {
                receiver.output(KV.of(key.read(), values));
            }
            batch.clear();
            log.debug("*** BATCH *** clear");
            numElementsInBatch.clear();
        }
    }
}

{code}
Compared the batches outputed by adding two stages:
{code:java}
.apply("Batch transactions New", GroupIntoBatches.of(200,250);
.apply("Batch transactions Old", 
org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
 

In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and 
creating a batch every 250 millis, but In Dataflow Runner this is not being 
triggered.

 

  was:
When using state time domain to batch items for an external api

We are trying to use the existing GroupIntoBatches. but we need some additional 
functionality to create batch every n milliseconds. So we are trying to add 
some functionality to this Transform by adding an additional Timer of 
TimeDomain PROCESSING_TIME

@TimerId(STALE_TIMER_ID)
 private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

When we are testing this Timer in Dataflow Runner, this is not outputting any 
new batches

Updated GroupIntoBatches code:
{code:java}
package my.package;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class GroupIntoBatches<K, InputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
Iterable<InputT>>>> {

    private final long batchSize;
    private final long staleTime;

    public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, 
long staleTime) {
        return new GroupIntoBatches<>(batchSize, staleTime);
    }

    @Override
    public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, 
InputT>> input) {
        Duration allowedLateness = 
input.getWindowingStrategy().getAllowedLateness();

        checkArgument(
            input.getCoder() instanceof KvCoder,
            "coder specified in the input PCollection is not a KvCoder");
        KvCoder inputCoder = (KvCoder) input.getCoder();
        Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
        Coder<InputT> valueCoder = (Coder<InputT>) 
inputCoder.getCoderArguments().get(1);

        return input.apply(
            ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, 
allowedLateness, keyCoder, valueCoder)));
    }

    @VisibleForTesting
    static class GroupIntoBatchesDoFn<K, InputT>
        extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {

        private static final String STALE_TIMER_ID = "staleTimer";
        private static final String END_OF_WINDOW_ID = "endOFWindow";
        private static final String BATCH_ID = "batch";
        private static final String NUM_ELEMENTS_IN_BATCH_ID = 
"numElementsInBatch";
        private static final String KEY_ID = "key";
        private final long batchSize;
        private final long staleTime;
        private final Duration allowedLateness;

        @TimerId(END_OF_WINDOW_ID)
        private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @TimerId(STALE_TIMER_ID)
        private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId(BATCH_ID)
        private final StateSpec<BagState<InputT>> batchSpec;

        @StateId(NUM_ELEMENTS_IN_BATCH_ID)
        private final StateSpec<CombiningState<Long, long[], Long>> 
numElementsInBatchSpec;

        @StateId(KEY_ID)
        private final StateSpec<ValueState<K>> keySpec;

        private final long prefetchFrequency;

        GroupIntoBatchesDoFn(
            long batchSize,
            long staleTime,
            Duration allowedLateness,
            Coder<K> inputKeyCoder,
            Coder<InputT> inputValueCoder) {
            this.batchSize = batchSize;
            this.staleTime = staleTime;
            this.allowedLateness = allowedLateness;
            this.batchSpec = StateSpecs.bag(inputValueCoder);
            this.numElementsInBatchSpec =
                StateSpecs.combining(
                    new Combine.BinaryCombineLongFn() {

                        @Override
                        public long identity() {
                            return 0L;
                        }

                        @Override
                        public long apply(long left, long right) {
                            return left + right;
                        }
                    });

            this.keySpec = StateSpecs.value(inputKeyCoder);
            // prefetch every 20% of batchSize elements. Do not prefetch if 
batchSize is too little
            this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : 
(batchSize / 5);
        }

        @ProcessElement
        public void processElement(
            @TimerId(END_OF_WINDOW_ID) Timer timer,
            @TimerId(STALE_TIMER_ID) Timer staleTimer,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            @StateId(KEY_ID) ValueState<K> key,
            @Element KV<K, InputT> element,
            BoundedWindow window,
            OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
            Instant windowExpires = window.maxTimestamp().plus(allowedLateness);

            log.debug(
                "*** SET TIMER *** to point in time {} for window {}",
                windowExpires.toString(),
                window.toString());
            timer.set(windowExpires);
            key.write(element.getKey());
            batch.add(element.getValue());
            log.debug("*** BATCH *** Add element for window {} ", 
window.toString());

            // blind add is supported with combiningState
            numElementsInBatch.add(1L);
            Long num = numElementsInBatch.read();

            // set a stale time on Processing Timer to emit batch every n millis
            if (num == 1) {
                staleTimer.offset(Duration.millis(staleTime)).setRelative();
            }
            if (num % prefetchFrequency == 0) {
                // prefetch data and modify batch state (readLater() modifies 
this)
                batch.readLater();
            }
            if (num >= batchSize) {
                log.debug("*** END OF BATCH *** for window {}", 
window.toString());
                flushBatch(receiver, key, batch, numElementsInBatch);
            }
        }

        @OnTimer(STALE_TIMER_ID)
        public void onStaleCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** Stale Timer *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        @OnTimer(END_OF_WINDOW_ID)
        public void onTimerCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** END OF WINDOW *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        private void flushBatch(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            ValueState<K> key,
            BagState<InputT> batch,
            CombiningState<Long, long[], Long> numElementsInBatch) {
            Iterable<InputT> values = batch.read();
            // when the timer fires, batch state might be empty
            if (!Iterables.isEmpty(values)) {
                receiver.output(KV.of(key.read(), values));
            }
            batch.clear();
            log.debug("*** BATCH *** clear");
            numElementsInBatch.clear();
        }
    }
}

{code}
Compared the batches outputed by adding two stages:
{code:java}
.apply("Batch transactions New", GroupIntoBatches.of(200,250);
.apply("Batch transactions Old", 
org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
 

In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and 
creating a batch every 250 millis, but In Dataflow Runner this is not being 
triggered.

 


> Processing Time Timer is not being invoked
> ------------------------------------------
>
>                 Key: BEAM-10342
>                 URL: https://issues.apache.org/jira/browse/BEAM-10342
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: sailaxmankumar
>            Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing GroupIntoBatches. but we need some 
> additional functionality to create batch every n milliseconds. So we are 
> trying to add some functionality to this Transform by adding an additional 
> Timer of TimeDomain PROCESSING_TIME
> {code:java}
> @TimerId(STALE_TIMER_ID)
>  private final TimerSpec staleSpec = 
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any 
> new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
>     extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
> Iterable<InputT>>>> {
>     private final long batchSize;
>     private final long staleTime;
>     public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, 
> long staleTime) {
>         return new GroupIntoBatches<>(batchSize, staleTime);
>     }
>     @Override
>     public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, 
> InputT>> input) {
>         Duration allowedLateness = 
> input.getWindowingStrategy().getAllowedLateness();
>         checkArgument(
>             input.getCoder() instanceof KvCoder,
>             "coder specified in the input PCollection is not a KvCoder");
>         KvCoder inputCoder = (KvCoder) input.getCoder();
>         Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
>         Coder<InputT> valueCoder = (Coder<InputT>) 
> inputCoder.getCoderArguments().get(1);
>         return input.apply(
>             ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, 
> allowedLateness, keyCoder, valueCoder)));
>     }
>     @VisibleForTesting
>     static class GroupIntoBatchesDoFn<K, InputT>
>         extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
>         private static final String STALE_TIMER_ID = "staleTimer";
>         private static final String END_OF_WINDOW_ID = "endOFWindow";
>         private static final String BATCH_ID = "batch";
>         private static final String NUM_ELEMENTS_IN_BATCH_ID = 
> "numElementsInBatch";
>         private static final String KEY_ID = "key";
>         private final long batchSize;
>         private final long staleTime;
>         private final Duration allowedLateness;
>         @TimerId(END_OF_WINDOW_ID)
>         private final TimerSpec timer = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>         @TimerId(STALE_TIMER_ID)
>         private final TimerSpec staleSpec = 
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>         @StateId(BATCH_ID)
>         private final StateSpec<BagState<InputT>> batchSpec;
>         @StateId(NUM_ELEMENTS_IN_BATCH_ID)
>         private final StateSpec<CombiningState<Long, long[], Long>> 
> numElementsInBatchSpec;
>         @StateId(KEY_ID)
>         private final StateSpec<ValueState<K>> keySpec;
>         private final long prefetchFrequency;
>         GroupIntoBatchesDoFn(
>             long batchSize,
>             long staleTime,
>             Duration allowedLateness,
>             Coder<K> inputKeyCoder,
>             Coder<InputT> inputValueCoder) {
>             this.batchSize = batchSize;
>             this.staleTime = staleTime;
>             this.allowedLateness = allowedLateness;
>             this.batchSpec = StateSpecs.bag(inputValueCoder);
>             this.numElementsInBatchSpec =
>                 StateSpecs.combining(
>                     new Combine.BinaryCombineLongFn() {
>                         @Override
>                         public long identity() {
>                             return 0L;
>                         }
>                         @Override
>                         public long apply(long left, long right) {
>                             return left + right;
>                         }
>                     });
>             this.keySpec = StateSpecs.value(inputKeyCoder);
>             // prefetch every 20% of batchSize elements. Do not prefetch if 
> batchSize is too little
>             this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE 
> : (batchSize / 5);
>         }
>         @ProcessElement
>         public void processElement(
>             @TimerId(END_OF_WINDOW_ID) Timer timer,
>             @TimerId(STALE_TIMER_ID) Timer staleTimer,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
> Long> numElementsInBatch,
>             @StateId(KEY_ID) ValueState<K> key,
>             @Element KV<K, InputT> element,
>             BoundedWindow window,
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
>             Instant windowExpires = 
> window.maxTimestamp().plus(allowedLateness);
>             log.debug(
>                 "*** SET TIMER *** to point in time {} for window {}",
>                 windowExpires.toString(),
>                 window.toString());
>             timer.set(windowExpires);
>             key.write(element.getKey());
>             batch.add(element.getValue());
>             log.debug("*** BATCH *** Add element for window {} ", 
> window.toString());
>             // blind add is supported with combiningState
>             numElementsInBatch.add(1L);
>             Long num = numElementsInBatch.read();
>             // set a stale time on Processing Timer to emit batch every n 
> millis
>             if (num == 1) {
>                 staleTimer.offset(Duration.millis(staleTime)).setRelative();
>             }
>             if (num % prefetchFrequency == 0) {
>                 // prefetch data and modify batch state (readLater() modifies 
> this)
>                 batch.readLater();
>             }
>             if (num >= batchSize) {
>                 log.debug("*** END OF BATCH *** for window {}", 
> window.toString());
>                 flushBatch(receiver, key, batch, numElementsInBatch);
>             }
>         }
>         @OnTimer(STALE_TIMER_ID)
>         public void onStaleCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
> Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** Stale Timer *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         @OnTimer(END_OF_WINDOW_ID)
>         public void onTimerCallback(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             @Timestamp Instant timestamp,
>             @StateId(KEY_ID) ValueState<K> key,
>             @StateId(BATCH_ID) BagState<InputT> batch,
>             @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
> Long> numElementsInBatch,
>             BoundedWindow window) {
>             log.debug(
>                 "*** END OF WINDOW *** for timer timestamp {} in windows {}",
>                 timestamp,
>                 window.toString());
>             flushBatch(receiver, key, batch, numElementsInBatch);
>         }
>         private void flushBatch(
>             OutputReceiver<KV<K, Iterable<InputT>>> receiver,
>             ValueState<K> key,
>             BagState<InputT> batch,
>             CombiningState<Long, long[], Long> numElementsInBatch) {
>             Iterable<InputT> values = batch.read();
>             // when the timer fires, batch state might be empty
>             if (!Iterables.isEmpty(values)) {
>                 receiver.output(KV.of(key.read(), values));
>             }
>             batch.clear();
>             log.debug("*** BATCH *** clear");
>             numElementsInBatch.clear();
>         }
>     }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old", 
> org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>  
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and 
> creating a batch every 250 millis, but In Dataflow Runner this is not being 
> triggered.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to