[ https://issues.apache.org/jira/browse/BEAM-7955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-7955: ------------------------------- Status: Open (was: Triage Needed) > Dynamic Writer - combining computed shards' number for late events with > window's > -------------------------------------------------------------------------------- > > Key: BEAM-7955 > URL: https://issues.apache.org/jira/browse/BEAM-7955 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Affects Versions: 2.10.0 > Reporter: Mariusz Rebandel > Priority: Minor > > Runner attempts to combine shards' numbers computed for the window and > following panes with late events even if the window's accumulation mode is > set to DISCARDING_FIRED_PANES. This results in an exception thrown by > SingletonCombineFn. > Steps to recreate this behaviour: > - create dynamic writer with `withSharding()` option > - send stream of messages to Dataflow job via PubSub > - retain *some* messages > - let the rest of the messages flow to the job, until the watermark reaches > the window's end > - release retained messages > In case all PubSub traffic is halted and released after window's end, Beam > won't try to merge them. This only happens, if just a part of messages come > as late events. > Stacktrace: > {code:java} > java.lang.IllegalArgumentException: PCollection with more than one element > accessed as a singleton view. Consider using Combine.globally().asSingleton() > to combine the PCollection into a single value > > org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358) > > org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448) > > org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429) > > org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > Sharding implementation: > {code:java} > class RecordCountSharding[T](recordsPerShard: Int) extends > PTransform[PCollection[T], PCollectionView[java.lang.Integer]] { > import RecordCountSharding._ > override def expand(input: PCollection[T]): > PCollectionView[java.lang.Integer] = { > val count = input.apply( > Combine.globally(Count.combineFn[T]()).withoutDefaults() > ) > val shardsNum = count.apply( > MapElements.into(TypeDescriptors.integers()) > .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: > java.lang.Long => > new java.lang.Integer(getShardsNum(count, recordsPerShard)) > }) > ) > > shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows)) > } > } > object RecordCountSharding { > val ShardsNumForEmptyWindows = 0 > def apply[T](recordsPerShard: Int): RecordCountSharding[T] = { > if (recordsPerShard <= 0) { > throw new IllegalArgumentException(s"recordsPerShard must be greater > than 0! Got $recordsPerShard") > } > new RecordCountSharding[T](recordsPerShard) > } > def getShardsNum(count: Long, recordsPerShard: Int): Int = { > (count.toFloat / recordsPerShard.toFloat).ceil.toInt > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)