kennknowles opened a new issue, #19456:
URL: https://github.com/apache/beam/issues/19456

   When enable configuration of "spark.streaming.backpressure.enabled", the 
rate controller will work and return the maxNumRecords in SourceDStream.
    computeReadMaxRecords() when create MicrobatchSource instances for each 
batch
     
    Refer to source file: 
beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java
   
    
   ```
   
   private long computeReadMaxRecords() {
   if (boundMaxRecords > 0) {
   LOG.info(
   "Max records per batch
   has been set to {}, as configured in the PipelineOptions.",
   boundMaxRecords);
   return boundMaxRecords;
   }
   else {
   final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
   if (rateControlledMax.isDefined())
   {
   LOG.info(
   "Max records per batch has been set to {}, as advised by the rate 
controller.",
   rateControlledMax.get());
   return
   rateControlledMax.get();
   } else {
   LOG.info(
   "Max records per batch has not been limited by neither
   configuration "
   + "nor the rate controller, and will remain unlimited for the current batch "
   + "({}).",
   Long.MAX_VALUE);
   return
   Long.MAX_VALUE;
   }
   }
   }
   
   ```
   
    
   
   But in MicrobatchSource, there is a cache of reader, the key of the cache is 
based on MicrobatchSource
     
   ```
   
    private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> 
readerCache; 
   
   
   @SuppressWarnings("unchecked")
   public
   Source.Reader<T> getOrCreateReader(
     final PipelineOptions options, final CheckpointMarkT checkpointMark)
   throws IOException {
     try {
       initReaderCache((long) readerCacheInterval);
       return (Source.Reader<T>)
   readerCache.get(this, new ReaderLoader(options, checkpointMark));
     } catch (final ExecutionException
   e) {
       throw new RuntimeException("Failed to get or create reader", e);
     }
   }  
   ```
   
    When comparing two instances of MicrobatchSource, it only check the 
sourceId and splitId.
   ```
   
   @Override
   public boolean equals(Object o) {
   if (this == o) {
   return true;
   }
   if (!(o instanceof
   MicrobatchSource)) {
   return false;
   }
   MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
   if
   (sourceId != that.sourceId) {
   return false;
   }
   
   return splitId == that.splitId;
   }
   
   ```
   
     The reader cache expiration is defined in SourceDStream.java
   ```
   
   // Reader cache expiration interval. 50% of batch interval is added to 
accommodate latency. 
this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis();
    
   ```
   
    
    It means the reader cache only expire when the processing time of batch 
longer than 1.5 batch interval for the two instances of MicrobatchSource are 
equal even the value of maxNumRecords changed. It would make the backpress 
behavior work strange, it only update reader when the processing time of batch 
\> 1.5 batchInterval. 
     
    I suggest to add the maxNumRecords in the equals() of MicrobatchSource, 
then the reader could be reset when rate controller compute a new value of 
maxNumRecords.
   ```
   
   @Override
   public boolean equals(Object o) {
   if (this == o) {
   return true;
   }
   if (!(o instanceof
   MicrobatchSource)) {
   return false;
   }
   MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
   if
   (sourceId != that.sourceId) {
   return false;
   }
   if (maxNumRecords != that.maxNumRecords) {
   return
   false;
   }
   return splitId == that.splitId;
   }
   
   ```
   
    
     
   
   Imported from Jira 
[BEAM-6809](https://issues.apache.org/jira/browse/BEAM-6809). Original Jira may 
contain additional context.
   Reported by: roncai.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to