Mariusz Rebandel created BEAM-7955:
--------------------------------------

             Summary: Dynamic Writer - combining computed shards' number for 
late events with window's
                 Key: BEAM-7955
                 URL: https://issues.apache.org/jira/browse/BEAM-7955
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
    Affects Versions: 2.10.0
            Reporter: Mariusz Rebandel


Runner attempts to combine shards' numbers computed for the window and 
following panes with late events even if the window's accumulation mode is set 
to DISCARDING_FIRED_PANES. This results in an exception thrown by 
SingletonCombineFn.

Steps to recreate this behaviour:
 - create dynamic writer with `withSharding()` option
 - send stream of messages to Dataflow job via PubSub
 - retain *some* messages
 - let the rest of the messages flow to the job, until the watermark reaches 
the window's end
 - release retained messages

In case all PubSub traffic is halted and released after window's end, Beam 
won't try to merge them. This only happens, if just a part of messages come as 
late events.

Stacktrace:
{code:java}
java.lang.IllegalArgumentException: PCollection with more than one element 
accessed as a singleton view. Consider using Combine.globally().asSingleton() 
to combine the PCollection into a single value
        
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
        
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
        
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
        
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
        
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
        
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
        
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
        
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
        
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
        
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
{code}
Sharding implementation:
{code:java}
class RecordCountSharding[T](recordsPerShard: Int) extends 
PTransform[PCollection[T], PCollectionView[java.lang.Integer]] {
  import RecordCountSharding._
  override def expand(input: PCollection[T]): 
PCollectionView[java.lang.Integer] = {
    val count = input.apply(
      Combine.globally(Count.combineFn[T]()).withoutDefaults()
    )

    val shardsNum = count.apply(
      MapElements.into(TypeDescriptors.integers())
        .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: 
java.lang.Long =>
          new java.lang.Integer(getShardsNum(count, recordsPerShard))
        })
    )
      
shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
  }
}

object RecordCountSharding {
  val ShardsNumForEmptyWindows = 0

  def apply[T](recordsPerShard: Int): RecordCountSharding[T] = {
    if (recordsPerShard <= 0) {
      throw new IllegalArgumentException(s"recordsPerShard must be greater than 
0! Got $recordsPerShard")
    }
    new RecordCountSharding[T](recordsPerShard)
  }

  def getShardsNum(count: Long, recordsPerShard: Int): Int = {
    (count.toFloat / recordsPerShard.toFloat).ceil.toInt
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to