kennknowles opened a new issue, #19564:
URL: https://github.com/apache/beam/issues/19564

   Runner attempts to combine shards' numbers computed for the window and 
following panes with late events even if the window's accumulation mode is set 
to DISCARDING_FIRED_PANES. This results in an exception thrown by 
SingletonCombineFn.
   
   Steps to recreate this behaviour:
    - create dynamic writer with `withSharding()` option
    - send stream of messages to Dataflow job via PubSub
    - retain *some* messages
    - let the rest of the messages flow to the job, until the watermark reaches 
the window's end
    - release retained messages
   
   In case all PubSub traffic is halted and released after window's end, Beam 
won't try to merge them. This only happens, if just a part of messages come as 
late events.
   
   Stacktrace:
   ```
   
   java.lang.IllegalArgumentException: PCollection with more than one element 
accessed as a singleton
   view. Consider using Combine.globally().asSingleton() to combine the 
PCollection into a single value
   
          
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:358)
           
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:448)
   
          
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:429)
           
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:925)
   
          
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
   
          
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
   
          
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
   
          
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
   
          
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
   
          
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
   
          
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
   
          
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
   
          
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
   
          
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
   
          
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
   
          
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
   
          
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
   
          
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
   
          
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
   
          
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
   
          
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
   
          
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   
          java.lang.Thread.run(Thread.java:745)
   
   ```
   
   Sharding implementation:
   ```
   
   class RecordCountSharding[T](recordsPerShard: Int) extends 
PTransform[PCollection[T], PCollectionView[java.lang.Integer]]
   {
     import RecordCountSharding._
     override def expand(input: PCollection[T]): 
PCollectionView[java.lang.Integer]
   = {
       val count = input.apply(
         Combine.globally(Count.combineFn[T]()).withoutDefaults()
   
      )
   
       val shardsNum = count.apply(
         MapElements.into(TypeDescriptors.integers())
        
     .via(Contextful.fn[java.lang.Long, java.lang.Integer] { count: 
java.lang.Long =>
             new java.lang.Integer(getShardsNum(count,
   recordsPerShard))
           })
       )
         
shardsNum.apply(View.asSingleton().withDefaultValue(ShardsNumForEmptyWindows))
   
    }
   }
   
   object RecordCountSharding {
     val ShardsNumForEmptyWindows = 0
   
     def apply[T](recordsPerShard:
   Int): RecordCountSharding[T] = {
       if (recordsPerShard <= 0) {
         throw new IllegalArgumentException(s"recordsPerShard
   must be greater than 0! Got $recordsPerShard")
       }
       new RecordCountSharding[T](recordsPerShard)
   
    }
   
     def getShardsNum(count: Long, recordsPerShard: Int): Int = {
       (count.toFloat / recordsPerShard.toFloat).ceil.toInt
   
    }
   }
   
   ```
   
   
   Imported from Jira 
[BEAM-7955](https://issues.apache.org/jira/browse/BEAM-7955). Original Jira may 
contain additional context.
   Reported by: mariusz.r.allegro.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to