This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82138f4b430 [SPARK-43233][SS] Add logging for Kafka Batch Reading for 
topic partition, offset range and task ID
82138f4b430 is described below

commit 82138f4b430089766af4e457ba052f0d252f9f75
Author: Siying Dong <[email protected]>
AuthorDate: Tue Apr 25 09:33:03 2023 +0900

    [SPARK-43233][SS] Add logging for Kafka Batch Reading for topic partition, 
offset range and task ID
    
    ### What changes were proposed in this pull request?
    We add a logging when creating the batch reader with task ID, topic, 
partition and offset range included.
    The log line looks like following:
    
    23/04/18 22:35:38 INFO KafkaBatchReaderFactory: Creating Kafka reader 
partitionId=1 
partition=StreamingDustTest-KafkaToKafkaTopic-4ccf8662-c3ca-4f3b-871e-1853c0e61765-source-2
 fromOffset=0 untilOffset=3 queryId=b5b806c3-ebf3-432e-a9a7-d882d474c0f5 
batchId=0 taskId=1
    
    ### Why are the changes needed?
    Right now, for structure streaming from Kafka, it's hard to finding which 
task handling which topic/partition and offset range.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Run KafkaMicroBatchV2SourceSuite and watch logging outputs contain 
information needed. Also does a small cluster test and observe logs.
    
    Closes #40905 from siying/kafka_logging.
    
    Lead-authored-by: Siying Dong <[email protected]>
    Co-authored-by: Ubuntu 
<[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../spark/sql/kafka010/KafkaBatchPartitionReader.scala      | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index b6d64c79b1d..508f5c7036b 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
 
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, 
StreamExecution}
 import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
 
 /** A [[InputPartition]] for reading Kafka data in a batch based streaming 
query. */
@@ -34,9 +36,18 @@ private[kafka010] case class KafkaBatchInputPartition(
     failOnDataLoss: Boolean,
     includeHeaders: Boolean) extends InputPartition
 
-private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+    val taskCtx = TaskContext.get()
+    val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
+    logInfo(s"Creating Kafka reader 
topicPartition=${p.offsetRange.topicPartition} " +
+      s"fromOffset=${p.offsetRange.fromOffset} 
untilOffset=${p.offsetRange.untilOffset}, " +
+      s"for query queryId=$queryId batchId=$batchId 
taskId=${TaskContext.get().taskAttemptId()} " +
+      s"partitionId=${TaskContext.get().partitionId()}")
+
     KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, 
p.pollTimeoutMs,
       p.failOnDataLoss, p.includeHeaders)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to