[ 
https://issues.apache.org/jira/browse/BEAM-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Cai updated BEAM-6809:
--------------------------
    Description: 
When enable configuration of "spark.streaming.backpressure.enabled", the rate 
controller will work and return the maxNumRecords in SourceDStream.
 computeReadMaxRecords() when create MicrobatchSource instances for each batch
  
 Refer to source file: 
beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java

 
{code:java}
private long computeReadMaxRecords() {
if (boundMaxRecords > 0) {
LOG.info(
"Max records per batch has been set to {}, as configured in the 
PipelineOptions.",
boundMaxRecords);
return boundMaxRecords;
} else {
final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
if (rateControlledMax.isDefined()) {
LOG.info(
"Max records per batch has been set to {}, as advised by the rate controller.",
rateControlledMax.get());
return rateControlledMax.get();
} else {
LOG.info(
"Max records per batch has not been limited by neither configuration "
+ "nor the rate controller, and will remain unlimited for the current batch "
+ "({}).",
Long.MAX_VALUE);
return Long.MAX_VALUE;
}
}
}
{code}
 

But in MicrobatchSource, there is a cache of reader, the key of the cache is 
based on MicrobatchSource
  
{code:java}
 private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> 
readerCache; 


@SuppressWarnings("unchecked")
public Source.Reader<T> getOrCreateReader(
  final PipelineOptions options, final CheckpointMarkT checkpointMark) throws 
IOException {
  try {
    initReaderCache((long) readerCacheInterval);
    return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, 
checkpointMark));
  } catch (final ExecutionException e) {
    throw new RuntimeException("Failed to get or create reader", e);
  }
}  {code}
 When comparing two instances of MicrobatchSource, it only check the sourceId 
and splitId.
{code:java}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MicrobatchSource)) {
return false;
}
MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
if (sourceId != that.sourceId) {
return false;
}

return splitId == that.splitId;
}
{code}
  The reader cache expiration is defined in SourceDStream.java
{code:java}
// Reader cache expiration interval. 50% of batch interval is added to 
accommodate latency. 
this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis();  {code}
 
 It means the reader cache only expire when the processing time of batch longer 
than 1.5 batch interval for the two instances of MicrobatchSource are equal 
even the value of maxNumRecords changed. It would make the backpress behavior 
work strange, it only update reader when the processing time of batch > 1.5 
batchInterval. 
  
 I suggest to add the maxNumRecords in the equals() of MicrobatchSource, then 
the reader could be reset when rate controller compute a new value of 
maxNumRecords.
{code:java}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MicrobatchSource)) {
return false;
}
MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
if (sourceId != that.sourceId) {
return false;
}
if (maxNumRecords != that.maxNumRecords) {
return false;
}
return splitId == that.splitId;
}
{code}
 
  

  was:
When enable configuration of "spark.streaming.backpressure.enabled", the rate 
controller will work and return the maxNumRecords in SourceDStream.
 computeReadMaxRecords() when create MicrobatchSource instances for each batch
  
 Refer to source file: 
beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java

 
{code:java}
private long computeReadMaxRecords() {
if (boundMaxRecords > 0) {
LOG.info(
"Max records per batch has been set to {}, as configured in the 
PipelineOptions.",
boundMaxRecords);
return boundMaxRecords;
} else {
final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
if (rateControlledMax.isDefined()) {
LOG.info(
"Max records per batch has been set to {}, as advised by the rate controller.",
rateControlledMax.get());
return rateControlledMax.get();
} else {
LOG.info(
"Max records per batch has not been limited by neither configuration "
+ "nor the rate controller, and will remain unlimited for the current batch "
+ "({}).",
Long.MAX_VALUE);
return Long.MAX_VALUE;
}
}
}
{code}
 

But in MicrobatchSource, there is a cache of reader, the key of the cache is 
based on MicrobatchSource
 
{code:java}
 private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> 
readerCache; 


@SuppressWarnings("unchecked")
public Source.Reader<T> getOrCreateReader(
  final PipelineOptions options, final CheckpointMarkT checkpointMark) throws 
IOException {
  try {
    initReaderCache((long) readerCacheInterval);
    return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, 
checkpointMark));
  } catch (final ExecutionException e) {
    throw new RuntimeException("Failed to get or create reader", e);
  }
}  {code}
 When comparing two instances of MicrobatchSource, it only check the sourceId 
and splitId.
{code:java}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MicrobatchSource)) {
return false;
}
MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
if (sourceId != that.sourceId) {
return false;
}

return splitId == that.splitId;
}
{code}
  The reader cache expiration is defined in SourceDStream.java
{code:java}
// Reader cache expiration interval. 50% of batch interval is added to 
accommodate latency. 
this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis();  {code}
 
It means the reader cache only expire when the processing time of batch longer 
than 1.5 batch interval for the two instances of MicrobatchSource are equal 
even the value of maxNumRecords changed. It would make the backpress behavior 
work strange, it only update reader when the processing time of batch > 1.5 
batchInterval. 
 
I suggest to add the maxNumRecords in the equals() of MicrobatchSource.
{code:java}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MicrobatchSource)) {
return false;
}
MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
if (sourceId != that.sourceId) {
return false;
}
if (maxNumRecords != that.maxNumRecords) {
return false;
}
return splitId == that.splitId;
}
{code}
 
 


> spark.streaming.backpressure not work properly
> ----------------------------------------------
>
>                 Key: BEAM-6809
>                 URL: https://issues.apache.org/jira/browse/BEAM-6809
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Ron Cai
>            Priority: Major
>
> When enable configuration of "spark.streaming.backpressure.enabled", the rate 
> controller will work and return the maxNumRecords in SourceDStream.
>  computeReadMaxRecords() when create MicrobatchSource instances for each batch
>   
>  Refer to source file: 
> beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java
>  
> {code:java}
> private long computeReadMaxRecords() {
> if (boundMaxRecords > 0) {
> LOG.info(
> "Max records per batch has been set to {}, as configured in the 
> PipelineOptions.",
> boundMaxRecords);
> return boundMaxRecords;
> } else {
> final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
> if (rateControlledMax.isDefined()) {
> LOG.info(
> "Max records per batch has been set to {}, as advised by the rate 
> controller.",
> rateControlledMax.get());
> return rateControlledMax.get();
> } else {
> LOG.info(
> "Max records per batch has not been limited by neither configuration "
> + "nor the rate controller, and will remain unlimited for the current batch "
> + "({}).",
> Long.MAX_VALUE);
> return Long.MAX_VALUE;
> }
> }
> }
> {code}
>  
> But in MicrobatchSource, there is a cache of reader, the key of the cache is 
> based on MicrobatchSource
>   
> {code:java}
>  private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> 
> readerCache; 
> @SuppressWarnings("unchecked")
> public Source.Reader<T> getOrCreateReader(
>   final PipelineOptions options, final CheckpointMarkT checkpointMark) throws 
> IOException {
>   try {
>     initReaderCache((long) readerCacheInterval);
>     return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, 
> checkpointMark));
>   } catch (final ExecutionException e) {
>     throw new RuntimeException("Failed to get or create reader", e);
>   }
> }  {code}
>  When comparing two instances of MicrobatchSource, it only check the sourceId 
> and splitId.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
>   The reader cache expiration is defined in SourceDStream.java
> {code:java}
> // Reader cache expiration interval. 50% of batch interval is added to 
> accommodate latency. 
> this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis();  {code}
>  
>  It means the reader cache only expire when the processing time of batch 
> longer than 1.5 batch interval for the two instances of MicrobatchSource are 
> equal even the value of maxNumRecords changed. It would make the backpress 
> behavior work strange, it only update reader when the processing time of 
> batch > 1.5 batchInterval. 
>   
>  I suggest to add the maxNumRecords in the equals() of MicrobatchSource, then 
> the reader could be reset when rate controller compute a new value of 
> maxNumRecords.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> if (maxNumRecords != that.maxNumRecords) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to