This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 98cb0cd  [SPARK-33635][SS] Adjust the order of check in 
KafkaTokenUtil.needTokenUpdate to remedy perf regression
98cb0cd is described below

commit 98cb0cda4101111130a6c9bc244632acd02826a3
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Tue Jan 5 21:59:49 2021 -0800

    [SPARK-33635][SS] Adjust the order of check in 
KafkaTokenUtil.needTokenUpdate to remedy perf regression
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to adjust the order of check in 
KafkaTokenUtil.needTokenUpdate, so that short-circuit applies on the 
non-delegation token cases (insecure + secured without delegation token) and 
remedies the performance regression heavily.
    
    ### Why are the changes needed?
    
    There's a serious performance regression between Spark 2.4 vs Spark 3.0 on 
read path against Kafka data source.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually ran a reproducer 
(https://github.com/codegorillauk/spark-kafka-read with modification to just 
count instead of writing to Kafka topic) with measuring the time.
    
    > the branch applying the change with adding measurement
    
    https://github.com/HeartSaVioR/spark/commits/debug-SPARK-33635-v3.0.1
    
    > the branch only adding measurement
    
    
https://github.com/HeartSaVioR/spark/commits/debug-original-ver-SPARK-33635-v3.0.1
    
    > the result (before the fix)
    
    count: 10280000
    Took 41.634007047 secs
    
    21/01/06 13:16:07 INFO KafkaDataConsumer: debug ver. 17-original
    21/01/06 13:16:07 INFO KafkaDataConsumer: Total time taken to retrieve: 
82118 ms
    
    > the result (after the fix)
    
    count: 10280000
    Took 7.964058475 secs
    
    21/01/06 13:08:22 INFO KafkaDataConsumer: debug ver. 17
    21/01/06 13:08:22 INFO KafkaDataConsumer: Total time taken to retrieve: 987 
ms
    
    Closes #31056 from HeartSaVioR/SPARK-33635.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit fa9309001a47a2b87f7a735f964537886ed9bd4c)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 49109d3..370e727 100644
--- 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -295,8 +295,8 @@ private[spark] object KafkaTokenUtil extends Logging {
       sparkConf: SparkConf,
       params: ju.Map[String, Object],
       clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
-    if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
-        clusterConfig.isDefined && 
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
+    if (clusterConfig.isDefined && 
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG) &&
+        HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka")) {
       logDebug("Delegation token used by connector, checking if uses the 
latest token.")
       val connectorJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
       getTokenJaasParams(clusterConfig.get) != connectorJaasParams


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to