[
https://issues.apache.org/jira/browse/BEAM-11971?focusedWorklogId=753194&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753194
]
ASF GitHub Bot logged work on BEAM-11971:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Apr/22 05:49
Start Date: 06/Apr/22 05:49
Worklog Time Spent: 10m
Work Description: reuvenlax commented on code in PR #16928:
URL: https://github.com/apache/beam/pull/16928#discussion_r843490017
##########
website/www/site/layouts/shortcodes/flink_java_pipeline_options.html:
##########
@@ -77,11 +77,6 @@
<td>Remove unneeded deep copy between operators. See
https://issues.apache.org/jira/browse/BEAM-11146</td>
<td>Default: <code>false</code></td>
</tr>
-<tr>
Review Comment:
done
##########
runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java:
##########
@@ -104,6 +103,7 @@ public void
processElement(WindowedValue<TestStreamIndex<T>> element) throws Exc
int index = streamIndex.getIndex();
Instant watermark = element.getTimestamp();
Event<T> event = events.get(index);
+ System.err.println("PROCESS ELEMENT " + element.getValue() + " EVENTS "
+ event);
Review Comment:
removed
Issue Time Tracking
-------------------
Worklog Id: (was: 753194)
Time Spent: 7h 40m (was: 7.5h)
> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
> Key: BEAM-11971
> URL: https://issues.apache.org/jira/browse/BEAM-11971
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Reza ardeshir rokni
> Assignee: Reuven Lax
> Priority: P3
> Time Spent: 7h 40m
> Remaining Estimate: 0h
>
> State is set to {{null}} while active timer is present, this issue does not
> show in other runners.
> The following example will reach the IllegalStateException within 10-20 times
> of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces
> with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did
> not reproduce with only one key, have not tried with more than 3 keys to see
> if it's easier to reproduce.
>
> {code}
> package test;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.WithKeys;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import java.util.Optional;
>
> public class Test {
> public static void main (String [] args) throws Exception{
> Test.testToFailure();
> }
> public static void testToFailure() throws Exception {
> int count = 0;
> while (true) {
> failingTest();
> System.out.println(
> String.format("Got to Count %s", String.valueOf(count++)));
> }
> }
> public static void failingTest() throws Exception {
> Pipeline p = Pipeline.create();
> Instant now = Instant.now();
> TestStream<Integer> stream =
> TestStream.create(BigEndianIntegerCoder.of())
> .addElements(1)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(2)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(3)
> .advanceWatermarkToInfinity();
> p.apply(stream)
> .apply(WithKeys.of(x -> x))
> .setCoder(KvCoder.of(BigEndianIntegerCoder.of(),
> BigEndianIntegerCoder.of()))
> .apply(new TestToFail());
> p.run();
> }
> public static class TestToFail
> extends PTransform<PCollection<KV<Integer, Integer>>,
> PCollection<Integer>> {
> @Override
> public PCollection<Integer> expand(PCollection<KV<Integer, Integer>>
> input) {
> return input.apply(ParDo.of(new LoopingRead()));
> }
> }
> public static class LoopingRead extends DoFn<KV<Integer, Integer>,
> Integer> {
> static int LOOP_COUNT = 100;
> @StateId("value")
> private final StateSpec<ValueState<Integer>> value =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @StateId("count")
> private final StateSpec<ValueState<Integer>> count =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @TimerId("actionTimers")
> private final TimerSpec timer =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> ProcessContext c,
> @StateId("value") ValueState<Integer> value,
> @TimerId("actionTimers") Timer timers) {
> value.write(c.element().getValue());
> timers.set(c.timestamp().plus(Duration.millis(1000)));
> }
> /** */
> @OnTimer("actionTimers")
> public void onTimer(
> OnTimerContext c,
> @StateId("value") ValueState<Integer> value,
> @StateId("count") ValueState<Integer> count,
> @TimerId("actionTimers") Timer timers) {
> if (value.read() == null) {
> throw new IllegalStateException("BINGO!");
> }
> Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
> count.write(counter);
> value.write(value.read() + counter);
> if (counter < LOOP_COUNT) {
> timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)