[ 
https://issues.apache.org/jira/browse/BEAM-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Cai updated BEAM-6809:
--------------------------
    Affects Version/s: 2.11.0

> spark.streaming.backpressure not work properly
> ----------------------------------------------
>
>                 Key: BEAM-6809
>                 URL: https://issues.apache.org/jira/browse/BEAM-6809
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.11.0
>            Reporter: Ron Cai
>            Priority: Major
>
> When enable configuration of "spark.streaming.backpressure.enabled", the rate 
> controller will work and return the maxNumRecords in SourceDStream.
>  computeReadMaxRecords() when create MicrobatchSource instances for each batch
>   
>  Refer to source file: 
> beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java
>  
> {code:java}
> private long computeReadMaxRecords() {
> if (boundMaxRecords > 0) {
> LOG.info(
> "Max records per batch has been set to {}, as configured in the 
> PipelineOptions.",
> boundMaxRecords);
> return boundMaxRecords;
> } else {
> final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
> if (rateControlledMax.isDefined()) {
> LOG.info(
> "Max records per batch has been set to {}, as advised by the rate 
> controller.",
> rateControlledMax.get());
> return rateControlledMax.get();
> } else {
> LOG.info(
> "Max records per batch has not been limited by neither configuration "
> + "nor the rate controller, and will remain unlimited for the current batch "
> + "({}).",
> Long.MAX_VALUE);
> return Long.MAX_VALUE;
> }
> }
> }
> {code}
>  
> But in MicrobatchSource, there is a cache of reader, the key of the cache is 
> based on MicrobatchSource
>   
> {code:java}
>  private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> 
> readerCache; 
> @SuppressWarnings("unchecked")
> public Source.Reader<T> getOrCreateReader(
>   final PipelineOptions options, final CheckpointMarkT checkpointMark) throws 
> IOException {
>   try {
>     initReaderCache((long) readerCacheInterval);
>     return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, 
> checkpointMark));
>   } catch (final ExecutionException e) {
>     throw new RuntimeException("Failed to get or create reader", e);
>   }
> }  {code}
>  When comparing two instances of MicrobatchSource, it only check the sourceId 
> and splitId.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
>   The reader cache expiration is defined in SourceDStream.java
> {code:java}
> // Reader cache expiration interval. 50% of batch interval is added to 
> accommodate latency. 
> this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis();  {code}
>  
>  It means the reader cache only expire when the processing time of batch 
> longer than 1.5 batch interval for the two instances of MicrobatchSource are 
> equal even the value of maxNumRecords changed. It would make the backpress 
> behavior work strange, it only update reader when the processing time of 
> batch > 1.5 batchInterval. 
>   
>  I suggest to add the maxNumRecords in the equals() of MicrobatchSource, then 
> the reader could be reset when rate controller compute a new value of 
> maxNumRecords.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> if (maxNumRecords != that.maxNumRecords) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to