This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 82138f4b430 [SPARK-43233][SS] Add logging for Kafka Batch Reading for
topic partition, offset range and task ID
82138f4b430 is described below
commit 82138f4b430089766af4e457ba052f0d252f9f75
Author: Siying Dong <[email protected]>
AuthorDate: Tue Apr 25 09:33:03 2023 +0900
[SPARK-43233][SS] Add logging for Kafka Batch Reading for topic partition,
offset range and task ID
### What changes were proposed in this pull request?
We add a logging when creating the batch reader with task ID, topic,
partition and offset range included.
The log line looks like following:
23/04/18 22:35:38 INFO KafkaBatchReaderFactory: Creating Kafka reader
partitionId=1
partition=StreamingDustTest-KafkaToKafkaTopic-4ccf8662-c3ca-4f3b-871e-1853c0e61765-source-2
fromOffset=0 untilOffset=3 queryId=b5b806c3-ebf3-432e-a9a7-d882d474c0f5
batchId=0 taskId=1
### Why are the changes needed?
Right now, for structure streaming from Kafka, it's hard to finding which
task handling which topic/partition and offset range.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run KafkaMicroBatchV2SourceSuite and watch logging outputs contain
information needed. Also does a small cluster test and observe logs.
Closes #40905 from siying/kafka_logging.
Lead-authored-by: Siying Dong <[email protected]>
Co-authored-by: Ubuntu
<[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/kafka010/KafkaBatchPartitionReader.scala | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index b6d64c79b1d..508f5c7036b 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
+import org.apache.spark.sql.execution.streaming.{MicroBatchExecution,
StreamExecution}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
/** A [[InputPartition]] for reading Kafka data in a batch based streaming
query. */
@@ -34,9 +36,18 @@ private[kafka010] case class KafkaBatchInputPartition(
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends InputPartition
-private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory with Logging {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+ val taskCtx = TaskContext.get()
+ val queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+ val batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
+ logInfo(s"Creating Kafka reader
topicPartition=${p.offsetRange.topicPartition} " +
+ s"fromOffset=${p.offsetRange.fromOffset}
untilOffset=${p.offsetRange.untilOffset}, " +
+ s"for query queryId=$queryId batchId=$batchId
taskId=${TaskContext.get().taskAttemptId()} " +
+ s"partitionId=${TaskContext.get().partitionId()}")
+
KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams,
p.pollTimeoutMs,
p.failOnDataLoss, p.includeHeaders)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]