[
https://issues.apache.org/jira/browse/BEAM-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kenneth Knowles updated BEAM-10342:
-----------------------------------
Description:
When using state time domain to batch items for an external api
We are trying to use the existing GroupIntoBatches. but we need some additional
functionality to create batch every n milliseconds. So we are trying to add
some functionality to this Transform by adding an additional Timer of
TimeDomain PROCESSING_TIME
{code:java}
@TimerId(STALE_TIMER_ID)
private final TimerSpec staleSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
{code}
When we are testing this Timer in Dataflow Runner, this is not outputting any
new batches
Updated GroupIntoBatches code:
{code:java}
package my.package;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class GroupIntoBatches<K, InputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
Iterable<InputT>>>> {
private final long batchSize;
private final long staleTime;
public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize,
long staleTime) {
return new GroupIntoBatches<>(batchSize, staleTime);
}
@Override
public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K,
InputT>> input) {
Duration allowedLateness =
input.getWindowingStrategy().getAllowedLateness();
checkArgument(
input.getCoder() instanceof KvCoder,
"coder specified in the input PCollection is not a KvCoder");
KvCoder inputCoder = (KvCoder) input.getCoder();
Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
Coder<InputT> valueCoder = (Coder<InputT>)
inputCoder.getCoderArguments().get(1);
return input.apply(
ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime,
allowedLateness, keyCoder, valueCoder)));
}
@VisibleForTesting
static class GroupIntoBatchesDoFn<K, InputT>
extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
private static final String STALE_TIMER_ID = "staleTimer";
private static final String END_OF_WINDOW_ID = "endOFWindow";
private static final String BATCH_ID = "batch";
private static final String NUM_ELEMENTS_IN_BATCH_ID =
"numElementsInBatch";
private static final String KEY_ID = "key";
private final long batchSize;
private final long staleTime;
private final Duration allowedLateness;
@TimerId(END_OF_WINDOW_ID)
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@TimerId(STALE_TIMER_ID)
private final TimerSpec staleSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId(BATCH_ID)
private final StateSpec<BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
private final StateSpec<CombiningState<Long, long[], Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
private final StateSpec<ValueState<K>> keySpec;
private final long prefetchFrequency;
GroupIntoBatchesDoFn(
long batchSize,
long staleTime,
Duration allowedLateness,
Coder<K> inputKeyCoder,
Coder<InputT> inputValueCoder) {
this.batchSize = batchSize;
this.staleTime = staleTime;
this.allowedLateness = allowedLateness;
this.batchSpec = StateSpecs.bag(inputValueCoder);
this.numElementsInBatchSpec =
StateSpecs.combining(
new Combine.BinaryCombineLongFn() {
@Override
public long identity() {
return 0L;
}
@Override
public long apply(long left, long right) {
return left + right;
}
});
this.keySpec = StateSpecs.value(inputKeyCoder);
// prefetch every 20% of batchSize elements. Do not prefetch if
batchSize is too little
this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE :
(batchSize / 5);
}
@ProcessElement
public void processElement(
@TimerId(END_OF_WINDOW_ID) Timer timer,
@TimerId(STALE_TIMER_ID) Timer staleTimer,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
@StateId(KEY_ID) ValueState<K> key,
@Element KV<K, InputT> element,
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
log.debug(
"*** SET TIMER *** to point in time {} for window {}",
windowExpires.toString(),
window.toString());
timer.set(windowExpires);
key.write(element.getKey());
batch.add(element.getValue());
log.debug("*** BATCH *** Add element for window {} ",
window.toString());
// blind add is supported with combiningState
numElementsInBatch.add(1L);
Long num = numElementsInBatch.read();
// set a stale time on Processing Timer to emit batch every n millis
if (num == 1) {
staleTimer.offset(Duration.millis(staleTime)).setRelative();
}
if (num % prefetchFrequency == 0) {
// prefetch data and modify batch state (readLater() modifies
this)
batch.readLater();
}
if (num >= batchSize) {
log.debug("*** END OF BATCH *** for window {}",
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
}
@OnTimer(STALE_TIMER_ID)
public void onStaleCallback(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
@Timestamp Instant timestamp,
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
BoundedWindow window) {
log.debug(
"*** Stale Timer *** for timer timestamp {} in windows {}",
timestamp,
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
@OnTimer(END_OF_WINDOW_ID)
public void onTimerCallback(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
@Timestamp Instant timestamp,
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
BoundedWindow window) {
log.debug(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
timestamp,
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
private void flushBatch(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
ValueState<K> key,
BagState<InputT> batch,
CombiningState<Long, long[], Long> numElementsInBatch) {
Iterable<InputT> values = batch.read();
// when the timer fires, batch state might be empty
if (!Iterables.isEmpty(values)) {
receiver.output(KV.of(key.read(), values));
}
batch.clear();
log.debug("*** BATCH *** clear");
numElementsInBatch.clear();
}
}
}
{code}
Compared the batches outputed by adding two stages:
{code:java}
.apply("Batch transactions New", GroupIntoBatches.of(200,250);
.apply("Batch transactions Old",
org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and
creating a batch every 250 millis, but In Dataflow Runner this is not being
triggered.
was:
When using state time domain to batch items for an external api
We are trying to use the existing GroupIntoBatches. but we need some additional
functionality to create batch every n milliseconds. So we are trying to add
some functionality to this Transform by adding an additional Timer of
TimeDomain PROCESSING_TIME
@TimerId(STALE_TIMER_ID)
private final TimerSpec staleSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
When we are testing this Timer in Dataflow Runner, this is not outputting any
new batches
Updated GroupIntoBatches code:
{code:java}
package my.package;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class GroupIntoBatches<K, InputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
Iterable<InputT>>>> {
private final long batchSize;
private final long staleTime;
public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize,
long staleTime) {
return new GroupIntoBatches<>(batchSize, staleTime);
}
@Override
public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K,
InputT>> input) {
Duration allowedLateness =
input.getWindowingStrategy().getAllowedLateness();
checkArgument(
input.getCoder() instanceof KvCoder,
"coder specified in the input PCollection is not a KvCoder");
KvCoder inputCoder = (KvCoder) input.getCoder();
Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
Coder<InputT> valueCoder = (Coder<InputT>)
inputCoder.getCoderArguments().get(1);
return input.apply(
ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime,
allowedLateness, keyCoder, valueCoder)));
}
@VisibleForTesting
static class GroupIntoBatchesDoFn<K, InputT>
extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
private static final String STALE_TIMER_ID = "staleTimer";
private static final String END_OF_WINDOW_ID = "endOFWindow";
private static final String BATCH_ID = "batch";
private static final String NUM_ELEMENTS_IN_BATCH_ID =
"numElementsInBatch";
private static final String KEY_ID = "key";
private final long batchSize;
private final long staleTime;
private final Duration allowedLateness;
@TimerId(END_OF_WINDOW_ID)
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@TimerId(STALE_TIMER_ID)
private final TimerSpec staleSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId(BATCH_ID)
private final StateSpec<BagState<InputT>> batchSpec;
@StateId(NUM_ELEMENTS_IN_BATCH_ID)
private final StateSpec<CombiningState<Long, long[], Long>>
numElementsInBatchSpec;
@StateId(KEY_ID)
private final StateSpec<ValueState<K>> keySpec;
private final long prefetchFrequency;
GroupIntoBatchesDoFn(
long batchSize,
long staleTime,
Duration allowedLateness,
Coder<K> inputKeyCoder,
Coder<InputT> inputValueCoder) {
this.batchSize = batchSize;
this.staleTime = staleTime;
this.allowedLateness = allowedLateness;
this.batchSpec = StateSpecs.bag(inputValueCoder);
this.numElementsInBatchSpec =
StateSpecs.combining(
new Combine.BinaryCombineLongFn() {
@Override
public long identity() {
return 0L;
}
@Override
public long apply(long left, long right) {
return left + right;
}
});
this.keySpec = StateSpecs.value(inputKeyCoder);
// prefetch every 20% of batchSize elements. Do not prefetch if
batchSize is too little
this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE :
(batchSize / 5);
}
@ProcessElement
public void processElement(
@TimerId(END_OF_WINDOW_ID) Timer timer,
@TimerId(STALE_TIMER_ID) Timer staleTimer,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
@StateId(KEY_ID) ValueState<K> key,
@Element KV<K, InputT> element,
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
Instant windowExpires = window.maxTimestamp().plus(allowedLateness);
log.debug(
"*** SET TIMER *** to point in time {} for window {}",
windowExpires.toString(),
window.toString());
timer.set(windowExpires);
key.write(element.getKey());
batch.add(element.getValue());
log.debug("*** BATCH *** Add element for window {} ",
window.toString());
// blind add is supported with combiningState
numElementsInBatch.add(1L);
Long num = numElementsInBatch.read();
// set a stale time on Processing Timer to emit batch every n millis
if (num == 1) {
staleTimer.offset(Duration.millis(staleTime)).setRelative();
}
if (num % prefetchFrequency == 0) {
// prefetch data and modify batch state (readLater() modifies
this)
batch.readLater();
}
if (num >= batchSize) {
log.debug("*** END OF BATCH *** for window {}",
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
}
@OnTimer(STALE_TIMER_ID)
public void onStaleCallback(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
@Timestamp Instant timestamp,
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
BoundedWindow window) {
log.debug(
"*** Stale Timer *** for timer timestamp {} in windows {}",
timestamp,
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
@OnTimer(END_OF_WINDOW_ID)
public void onTimerCallback(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
@Timestamp Instant timestamp,
@StateId(KEY_ID) ValueState<K> key,
@StateId(BATCH_ID) BagState<InputT> batch,
@StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
Long> numElementsInBatch,
BoundedWindow window) {
log.debug(
"*** END OF WINDOW *** for timer timestamp {} in windows {}",
timestamp,
window.toString());
flushBatch(receiver, key, batch, numElementsInBatch);
}
private void flushBatch(
OutputReceiver<KV<K, Iterable<InputT>>> receiver,
ValueState<K> key,
BagState<InputT> batch,
CombiningState<Long, long[], Long> numElementsInBatch) {
Iterable<InputT> values = batch.read();
// when the timer fires, batch state might be empty
if (!Iterables.isEmpty(values)) {
receiver.output(KV.of(key.read(), values));
}
batch.clear();
log.debug("*** BATCH *** clear");
numElementsInBatch.clear();
}
}
}
{code}
Compared the batches outputed by adding two stages:
{code:java}
.apply("Batch transactions New", GroupIntoBatches.of(200,250);
.apply("Batch transactions Old",
org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and
creating a batch every 250 millis, but In Dataflow Runner this is not being
triggered.
> Processing Time Timer is not being invoked
> ------------------------------------------
>
> Key: BEAM-10342
> URL: https://issues.apache.org/jira/browse/BEAM-10342
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: sailaxmankumar
> Priority: P2
>
> When using state time domain to batch items for an external api
> We are trying to use the existing GroupIntoBatches. but we need some
> additional functionality to create batch every n milliseconds. So we are
> trying to add some functionality to this Transform by adding an additional
> Timer of TimeDomain PROCESSING_TIME
> {code:java}
> @TimerId(STALE_TIMER_ID)
> private final TimerSpec staleSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> {code}
> When we are testing this Timer in Dataflow Runner, this is not outputting any
> new batches
> Updated GroupIntoBatches code:
> {code:java}
> package my.package;
> import static com.google.common.base.Preconditions.checkArgument;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.BagState;
> import org.apache.beam.sdk.state.CombiningState;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.transforms.Combine;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import com.google.common.annotations.VisibleForTesting;
> import com.google.common.collect.Iterables;
> import lombok.RequiredArgsConstructor;
> import lombok.extern.slf4j.Slf4j;
> @Slf4j
> @RequiredArgsConstructor
> public class GroupIntoBatches<K, InputT>
> extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
> Iterable<InputT>>>> {
> private final long batchSize;
> private final long staleTime;
> public static <K, InputT> GroupIntoBatches<K, InputT> of(long batchSize,
> long staleTime) {
> return new GroupIntoBatches<>(batchSize, staleTime);
> }
> @Override
> public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K,
> InputT>> input) {
> Duration allowedLateness =
> input.getWindowingStrategy().getAllowedLateness();
> checkArgument(
> input.getCoder() instanceof KvCoder,
> "coder specified in the input PCollection is not a KvCoder");
> KvCoder inputCoder = (KvCoder) input.getCoder();
> Coder<K> keyCoder = (Coder<K>) inputCoder.getCoderArguments().get(0);
> Coder<InputT> valueCoder = (Coder<InputT>)
> inputCoder.getCoderArguments().get(1);
> return input.apply(
> ParDo.of(new GroupIntoBatchesDoFn<>(batchSize, staleTime,
> allowedLateness, keyCoder, valueCoder)));
> }
> @VisibleForTesting
> static class GroupIntoBatchesDoFn<K, InputT>
> extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
> private static final String STALE_TIMER_ID = "staleTimer";
> private static final String END_OF_WINDOW_ID = "endOFWindow";
> private static final String BATCH_ID = "batch";
> private static final String NUM_ELEMENTS_IN_BATCH_ID =
> "numElementsInBatch";
> private static final String KEY_ID = "key";
> private final long batchSize;
> private final long staleTime;
> private final Duration allowedLateness;
> @TimerId(END_OF_WINDOW_ID)
> private final TimerSpec timer =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @TimerId(STALE_TIMER_ID)
> private final TimerSpec staleSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> @StateId(BATCH_ID)
> private final StateSpec<BagState<InputT>> batchSpec;
> @StateId(NUM_ELEMENTS_IN_BATCH_ID)
> private final StateSpec<CombiningState<Long, long[], Long>>
> numElementsInBatchSpec;
> @StateId(KEY_ID)
> private final StateSpec<ValueState<K>> keySpec;
> private final long prefetchFrequency;
> GroupIntoBatchesDoFn(
> long batchSize,
> long staleTime,
> Duration allowedLateness,
> Coder<K> inputKeyCoder,
> Coder<InputT> inputValueCoder) {
> this.batchSize = batchSize;
> this.staleTime = staleTime;
> this.allowedLateness = allowedLateness;
> this.batchSpec = StateSpecs.bag(inputValueCoder);
> this.numElementsInBatchSpec =
> StateSpecs.combining(
> new Combine.BinaryCombineLongFn() {
> @Override
> public long identity() {
> return 0L;
> }
> @Override
> public long apply(long left, long right) {
> return left + right;
> }
> });
> this.keySpec = StateSpecs.value(inputKeyCoder);
> // prefetch every 20% of batchSize elements. Do not prefetch if
> batchSize is too little
> this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE
> : (batchSize / 5);
> }
> @ProcessElement
> public void processElement(
> @TimerId(END_OF_WINDOW_ID) Timer timer,
> @TimerId(STALE_TIMER_ID) Timer staleTimer,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
> Long> numElementsInBatch,
> @StateId(KEY_ID) ValueState<K> key,
> @Element KV<K, InputT> element,
> BoundedWindow window,
> OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
> Instant windowExpires =
> window.maxTimestamp().plus(allowedLateness);
> log.debug(
> "*** SET TIMER *** to point in time {} for window {}",
> windowExpires.toString(),
> window.toString());
> timer.set(windowExpires);
> key.write(element.getKey());
> batch.add(element.getValue());
> log.debug("*** BATCH *** Add element for window {} ",
> window.toString());
> // blind add is supported with combiningState
> numElementsInBatch.add(1L);
> Long num = numElementsInBatch.read();
> // set a stale time on Processing Timer to emit batch every n
> millis
> if (num == 1) {
> staleTimer.offset(Duration.millis(staleTime)).setRelative();
> }
> if (num % prefetchFrequency == 0) {
> // prefetch data and modify batch state (readLater() modifies
> this)
> batch.readLater();
> }
> if (num >= batchSize) {
> log.debug("*** END OF BATCH *** for window {}",
> window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> }
> @OnTimer(STALE_TIMER_ID)
> public void onStaleCallback(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> @Timestamp Instant timestamp,
> @StateId(KEY_ID) ValueState<K> key,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
> Long> numElementsInBatch,
> BoundedWindow window) {
> log.debug(
> "*** Stale Timer *** for timer timestamp {} in windows {}",
> timestamp,
> window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> @OnTimer(END_OF_WINDOW_ID)
> public void onTimerCallback(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> @Timestamp Instant timestamp,
> @StateId(KEY_ID) ValueState<K> key,
> @StateId(BATCH_ID) BagState<InputT> batch,
> @StateId(NUM_ELEMENTS_IN_BATCH_ID) CombiningState<Long, long[],
> Long> numElementsInBatch,
> BoundedWindow window) {
> log.debug(
> "*** END OF WINDOW *** for timer timestamp {} in windows {}",
> timestamp,
> window.toString());
> flushBatch(receiver, key, batch, numElementsInBatch);
> }
> private void flushBatch(
> OutputReceiver<KV<K, Iterable<InputT>>> receiver,
> ValueState<K> key,
> BagState<InputT> batch,
> CombiningState<Long, long[], Long> numElementsInBatch) {
> Iterable<InputT> values = batch.read();
> // when the timer fires, batch state might be empty
> if (!Iterables.isEmpty(values)) {
> receiver.output(KV.of(key.read(), values));
> }
> batch.clear();
> log.debug("*** BATCH *** clear");
> numElementsInBatch.clear();
> }
> }
> }
> {code}
> Compared the batches outputed by adding two stages:
> {code:java}
> .apply("Batch transactions New", GroupIntoBatches.of(200,250);
> .apply("Batch transactions Old",
> org.apache.beam.sdk.transforms.GroupIntoBatches.ofSize(200);{code}
>
> In FlinkRunner we are seeing the PROCESSING_TIME timer being triggered and
> creating a batch every 250 millis, but In Dataflow Runner this is not being
> triggered.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)