[
https://issues.apache.org/jira/browse/BEAM-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ron Cai updated BEAM-6809:
--------------------------
Affects Version/s: 2.11.0
> spark.streaming.backpressure not work properly
> ----------------------------------------------
>
> Key: BEAM-6809
> URL: https://issues.apache.org/jira/browse/BEAM-6809
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.11.0
> Reporter: Ron Cai
> Priority: Major
>
> When enable configuration of "spark.streaming.backpressure.enabled", the rate
> controller will work and return the maxNumRecords in SourceDStream.
> computeReadMaxRecords() when create MicrobatchSource instances for each batch
>
> Refer to source file:
> beam\runners\spark\src\main\java\org\apache\beam\runners\spark\io\SourceDStream.java
>
> {code:java}
> private long computeReadMaxRecords() {
> if (boundMaxRecords > 0) {
> LOG.info(
> "Max records per batch has been set to {}, as configured in the
> PipelineOptions.",
> boundMaxRecords);
> return boundMaxRecords;
> } else {
> final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
> if (rateControlledMax.isDefined()) {
> LOG.info(
> "Max records per batch has been set to {}, as advised by the rate
> controller.",
> rateControlledMax.get());
> return rateControlledMax.get();
> } else {
> LOG.info(
> "Max records per batch has not been limited by neither configuration "
> + "nor the rate controller, and will remain unlimited for the current batch "
> + "({}).",
> Long.MAX_VALUE);
> return Long.MAX_VALUE;
> }
> }
> }
> {code}
>
> But in MicrobatchSource, there is a cache of reader, the key of the cache is
> based on MicrobatchSource
>
> {code:java}
> private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>>
> readerCache;
> @SuppressWarnings("unchecked")
> public Source.Reader<T> getOrCreateReader(
> final PipelineOptions options, final CheckpointMarkT checkpointMark) throws
> IOException {
> try {
> initReaderCache((long) readerCacheInterval);
> return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options,
> checkpointMark));
> } catch (final ExecutionException e) {
> throw new RuntimeException("Failed to get or create reader", e);
> }
> } {code}
> When comparing two instances of MicrobatchSource, it only check the sourceId
> and splitId.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
> The reader cache expiration is defined in SourceDStream.java
> {code:java}
> // Reader cache expiration interval. 50% of batch interval is added to
> accommodate latency.
> this.readerCacheInterval=1.5*sparkOptions.getBatchIntervalMillis(); {code}
>
> It means the reader cache only expire when the processing time of batch
> longer than 1.5 batch interval for the two instances of MicrobatchSource are
> equal even the value of maxNumRecords changed. It would make the backpress
> behavior work strange, it only update reader when the processing time of
> batch > 1.5 batchInterval.
>
> I suggest to add the maxNumRecords in the equals() of MicrobatchSource, then
> the reader could be reset when rate controller compute a new value of
> maxNumRecords.
> {code:java}
> @Override
> public boolean equals(Object o) {
> if (this == o) {
> return true;
> }
> if (!(o instanceof MicrobatchSource)) {
> return false;
> }
> MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
> if (sourceId != that.sourceId) {
> return false;
> }
> if (maxNumRecords != that.maxNumRecords) {
> return false;
> }
> return splitId == that.splitId;
> }
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)