[
https://issues.apache.org/jira/browse/BEAM-6188?focusedWorklogId=177994&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177994
]
ASF GitHub Bot logged work on BEAM-6188:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Dec/18 14:56
Start Date: 21/Dec/18 14:56
Worklog Time Spent: 10m
Work Description: lgajowy commented on issue #7226: [BEAM-6188] Unbouded
synthetic source
URL: https://github.com/apache/beam/pull/7226#issuecomment-449409277
FYI: I modified the watermark behavior. This is required to be able to stop
the streaming pipeline after all records are processed. This is similar to what
Nexmark `UnboundedEventSource` does and IMO it makes perfect sense here too.
Such approach doesn't work on Spark runner yet and this is the reason why
Nexmark streaming suites are not run on Jenkins.
Unfortunately, I get errors on Dataflow after providing this change. I
decided to push this new commit because the error seems to be worker or coder
related rather than watermark related:
```
Caused by: java.io.EOFException: reached end of stream after reading 17
bytes; 571284 bytes expected
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:740)
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.io.ByteStreams.readFully(ByteStreams.java:722)
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81)
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.RuntimeException: java.io.IOException: varint overflow 21432854729
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:110)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: varint overflow 21432854729
org.apache.beam.sdk.util.VarInt.decodeInt(VarInt.java:58)
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:103)
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:76)
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:118)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.decode(ValueWithRecordId.java:81)
org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.lambda$elementsIterable$2(WindmillKeyedWorkItem.java:107)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.Iterators$7.transform(Iterators.java:750)
org.apache.beam.runners.dataflow.worker.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:56)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation
```
Do you have any ideas on what may be going on?
I will continue investigating this after 2nd January 2019 since I'm taking
some days off.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 177994)
Time Spent: 3h 10m (was: 3h)
> Create unbounded synthetic source
> ---------------------------------
>
> Key: BEAM-6188
> URL: https://issues.apache.org/jira/browse/BEAM-6188
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Lukasz Gajowy
> Assignee: Lukasz Gajowy
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> It is needed for streaming scenarios. It should provide ways to reason about
> time and recovering from checkpoints.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)