[
https://issues.apache.org/jira/browse/BEAM-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547853#comment-17547853
]
Kenneth Knowles commented on BEAM-7955:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/19564
> Dynamic Writer - combining computed shards' number for late events with
> window's
> --------------------------------------------------------------------------------
>
> Key: BEAM-7955
> URL: https://issues.apache.org/jira/browse/BEAM-7955
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.10.0
> Reporter: Mariusz Rebandel
> Priority: P3
>
> Runner attempts to combine shards' numbers computed for the window and
> following panes with late events even if the window's accumulation mode is
> set to DISCARDING_FIRED_PANES. This results in an exception thrown by
> SingletonCombineFn.
> Steps to recreate this behaviour:
> - create dynamic writer with `withSharding()` option
> - send stream of messages to Dataflow job via PubSub
> - retain *some* messages
> - let the rest of the messages flow to the job, until the watermark reaches
> the window's end
> - release retained messages
> In case all PubSub traffic is halted and released after window's end, Beam
> won't try to merge them. This only happens, if just a part of messages come
> as late events.
> Stacktrace:
> {code:java}
> java.lang.IllegalArgumentException: PCollection with more than one element
> accessed as a singleton view. Consider using Combine.globally().asSingleton()
> to combine the PCollection into a single value
>
> org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
>
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
>
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
>
> org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> Sharding implementation:
> {code:java}
> class RecordCountSharding[T](recordsPerShard: Int) extends
> PTransform[PCollection[T], PCollectionView[java.lang.Integer]] {
> import RecordCountSharding._
> override def expand(input: PCollection[T]):
> PCollectionView[java.lang.Integer] = {
> val count = input.apply(
> Combine.globally(Count.combineFn[T]()).withoutDefaults()
> )
> val shardsNum = count.apply(
> MapElements.into(TypeDescriptors.integers())
> .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count:
> java.lang.Long =>
> new java.lang.Integer(getShardsNum(count, recordsPerShard))
> })
> )
>
> shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
> }
> }
> object RecordCountSharding {
> val ShardsNumForEmptyWindows = 0
> def apply[T](recordsPerShard: Int): RecordCountSharding[T] = {
> if (recordsPerShard <= 0) {
> throw new IllegalArgumentException(s"recordsPerShard must be greater
> than 0! Got $recordsPerShard")
> }
> new RecordCountSharding[T](recordsPerShard)
> }
> def getShardsNum(count: Long, recordsPerShard: Int): Int = {
> (count.toFloat / recordsPerShard.toFloat).ceil.toInt
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)