sailaxmankumar created BEAM-10342:
-------------------------------------

             Summary: Processing Time Timers is not being invoked
                 Key: BEAM-10342
                 URL: https://issues.apache.org/jira/browse/BEAM-10342
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: sailaxmankumar


When using state time domain to batch items for an external api

We are trying to use the existing GroupIntoBatches. but we need some additional 
functionality to create batch every n milliseconds. So we are trying to add 
some functionality to this Transform by adding an additional Timer of 
TimeDomain PROCESSING_TIME

@TimerId(STALE_TIMER_ID)
private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

When we are testing this Timer in Dataflow Runner, this is not outputting any 
new batches

Updated GroupIntoBatches code:
{code:java}
package com.anz.fabric.transactions.service.lwc;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class GroupIntoBatches<K, InputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, 
Iterable<InputT>>>> {

    private final long batchSize;
    private final long staleTime;

    public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize, 
long staleTime) {
        return new GroupIntoBatches<>(batchSize, staleTime);
    }

    @Override
    public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, 
InputT>> input) {
        Duration allowedLateness = 
input.getWindowingStrategy().getAllowedLateness();

        checkArgument(
            input.getCoder() instanceof KvCoder,
            "coder specified in the input PCollection is not a KvCoder");
        KvCoder inputCoder = (KvCoder) input.getCoder();
        Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
        Coder<InputT> valueCoder = (Coder<InputT>) 
inputCoder.getCoderArguments().get(1);

        return input.apply(
            ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime, 
allowedLateness, keyCoder, valueCoder)));
    }

    @VisibleForTesting
    static class GroupIntoBatchesDoFn<K, InputT>
        extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {

        private static final String STALE_TIMER_ID = "staleTimer";
        private static final String END_OF_WINDOW_ID = "endOFWindow";
        private static final String BATCH_ID = "batch";
        private static final String NUM_ELEMENTS_IN_BATCH_ID = 
"numElementsInBatch";
        private static final String KEY_ID = "key";
        private final long batchSize;
        private final long staleTime;
        private final Duration allowedLateness;

        @TimerId(END_OF_WINDOW_ID)
        private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @TimerId(STALE_TIMER_ID)
        private final TimerSpec staleSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId(BATCH_ID)
        private final StateSpec<BagState<InputT>> batchSpec;

        @StateId(NUM_ELEMENTS_IN_BATCH_ID)
        private final StateSpec<CombiningState<Long, long[], Long>> 
numElementsInBatchSpec;

        @StateId(KEY_ID)
        private final StateSpec<ValueState<K>> keySpec;

        private final long prefetchFrequency;

        GroupIntoBatchesDoFn(
            long batchSize,
            long staleTime,
            Duration allowedLateness,
            Coder<K> inputKeyCoder,
            Coder<InputT> inputValueCoder) {
            this.batchSize = batchSize;
            this.staleTime = staleTime;
            this.allowedLateness = allowedLateness;
            this.batchSpec = StateSpecs.bag(inputValueCoder);
            this.numElementsInBatchSpec =
                StateSpecs.combining(
                    new Combine.BinaryCombineLongFn() {

                        @Override
                        public long identity() {
                            return 0L;
                        }

                        @Override
                        public long apply(long left, long right) {
                            return left + right;
                        }
                    });

            this.keySpec = StateSpecs.value(inputKeyCoder);
            // prefetch every 20% of batchSize elements. Do not prefetch if 
batchSize is too little
            this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : 
(batchSize / 5);
        }

        @ProcessElement
        public void processElement(
            @TimerId(END_OF_WINDOW_ID) Timer timer,
            @TimerId(STALE_TIMER_ID) Timer staleTimer,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            @StateId(KEY_ID) ValueState<K> key,
            @Element KV<K, InputT> element,
            BoundedWindow window,
            OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
            Instant windowExpires = window.maxTimestamp().plus(allowedLateness);

            log.debug(
                "*** SET TIMER *** to point in time {} for window {}",
                windowExpires.toString(),
                window.toString());
            timer.set(windowExpires);
            key.write(element.getKey());
            batch.add(element.getValue());
            log.debug("*** BATCH *** Add element for window {} ", 
window.toString());

            // blind add is supported with combiningState
            numElementsInBatch.add(1L);
            Long num = numElementsInBatch.read();

            // set a stale time on Processing Timer to emit batch every n millis
            if (num == 1) {
                staleTimer.offset(Duration.millis(staleTime)).setRelative();
            }
            if (num % prefetchFrequency == 0) {
                // prefetch data and modify batch state (readLater() modifies 
this)
                batch.readLater();
            }
            if (num >= batchSize) {
                log.debug("*** END OF BATCH *** for window {}", 
window.toString());
                flushBatch(receiver, key, batch, numElementsInBatch);
            }
        }

        @OnTimer(STALE_TIMER_ID)
        public void onStaleCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** Stale Timer *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        @OnTimer(END_OF_WINDOW_ID)
        public void onTimerCallback(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            @Timestamp Instant timestamp,
            @StateId(KEY_ID) ValueState<K> key,
            @StateId(BATCH_ID) BagState<InputT> batch,
            @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[], 
Long> numElementsInBatch,
            BoundedWindow window) {
            log.debug(
                "*** END OF WINDOW *** for timer timestamp {} in windows {}",
                timestamp,
                window.toString());
            flushBatch(receiver, key, batch, numElementsInBatch);
        }

        private void flushBatch(
            OutputReceiver<KV<K, Iterable<InputT>>> receiver,
            ValueState<K> key,
            BagState<InputT> batch,
            CombiningState<Long, long[], Long> numElementsInBatch) {
            Iterable<InputT> values = batch.read();
            // when the timer fires, batch state might be empty
            if (!Iterables.isEmpty(values)) {
                receiver.output(KV.of(key.read(), values));
            }
            batch.clear();
            log.debug("*** BATCH *** clear");
            numElementsInBatch.clear();
        }
    }
}

{code}
Compared the batches outputed by adding two stages:
{code:java}
.apply("Batch transactions New", GroupIntoBatches.of(200,250);
.apply("Batch transactions Old", 
org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
 

In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered an 
creating a batch every 250 millis, but In Dataflow Runner this is not being 
triggered.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to