Sidhavratha Kumar created SPARK-25239:
-----------------------------------------

             Summary: Spark Streaming for Kafka should allow uniform batch size 
per partition for streaming RDD
                 Key: SPARK-25239
                 URL: https://issues.apache.org/jira/browse/SPARK-25239
             Project: Spark
          Issue Type: Improvement
          Components: DStreams
    Affects Versions: 2.2.0, 2.1.0, 2.4.0
            Reporter: Sidhavratha Kumar


 
 
Current logic to determine maxMessagesPerPartition results in non-uniform 
message size per partition based on lag of each partition.

 
{code:java}
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
{code}
{code:java}
 if (effectiveRateLimitPerPartition.values.sum > 0) { 
    val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 
    Some(effectiveRateLimitPerPartition.map { 
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong 
    }) 
}
{code}
 

 

 

This will result in wastage of resource, since few cores which have less 
messages to process will wait till other cores are done with their task.

Let us consider topic t have 2 partitions

 

 
||Topic||Partition||Start Offset||End Offset||Current Offset||
|t|0|0|10000|0|
|t|1|0|100|0|

and maxRatePerPartition = 1000

and batch duration = 10 sec

As per calculation

maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
 maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1

If application is running on 2 cores, one core will wait after processing 1 
record of partition 1 till 99 records gets processed on other core for 
partition 0, before picking up next RDD.

If we enforce uniformity in batch size across partitions in each rdd, it will 
avoid wastage of resource.
 In above case, we can put batch size for each partition = max(batch size of 
all partitions) i.e. 99.

maxMessage for part-0 = 99
 maxMessage for part-1 = 99

So, we can process 98 more records of partition 1 in same time without wasting 
any resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to