kennknowles opened a new issue, #19564:
URL: https://github.com/apache/beam/issues/19564
Runner attempts to combine shards' numbers computed for the window and
following panes with late events even if the window's accumulation mode is set
to DISCARDING_FIRED_PANES. This results in an exception thrown by
SingletonCombineFn.
Steps to recreate this behaviour:
- create dynamic writer with `withSharding()` option
- send stream of messages to Dataflow job via PubSub
- retain *some* messages
- let the rest of the messages flow to the job, until the watermark reaches
the window's end
- release retained messages
In case all PubSub traffic is halted and released after window's end, Beam
won't try to merge them. This only happens, if just a part of messages come as
late events.
Stacktrace:
```
java.lang.IllegalArgumentException: PCollection with more than one element
accessed as a singleton
view. Consider using Combine.globally().asSingleton() to combine the
PCollection into a single value
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
```
Sharding implementation:
```
class RecordCountSharding[T](recordsPerShard: Int) extends
PTransform[PCollection[T], PCollectionView[java.lang.Integer]]
{
import RecordCountSharding._
override def expand(input: PCollection[T]):
PCollectionView[java.lang.Integer]
= {
val count = input.apply(
Combine.globally(Count.combineFn[T]()).withoutDefaults()
)
val shardsNum = count.apply(
MapElements.into(TypeDescriptors.integers())
.via(Contextful.fn[java.lang.Long, java.lang.Integer] { count:
java.lang.Long =>
new java.lang.Integer(getShardsNum(count,
recordsPerShard))
})
)
shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
}
}
object RecordCountSharding {
val ShardsNumForEmptyWindows = 0
def apply[T](recordsPerShard:
Int): RecordCountSharding[T] = {
if (recordsPerShard <= 0) {
throw new IllegalArgumentException(s"recordsPerShard
must be greater than 0! Got $recordsPerShard")
}
new RecordCountSharding[T](recordsPerShard)
}
def getShardsNum(count: Long, recordsPerShard: Int): Int = {
(count.toFloat / recordsPerShard.toFloat).ceil.toInt
}
}
```
Imported from Jira
[BEAM-7955](https://issues.apache.org/jira/browse/BEAM-7955). Original Jira may
contain additional context.
Reported by: mariusz.r.allegro.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]