This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 705507f [SPARK-27494][SS] Null values don't work in Kafka source v2
705507f is described below
commit 705507facda11060f1a0beb04d1dd19bda5fc4f3
Author: uncleGen <[email protected]>
AuthorDate: Fri Apr 26 14:25:31 2019 +0800
[SPARK-27494][SS] Null values don't work in Kafka source v2
## What changes were proposed in this pull request?
Right now Kafka source v2 doesn't support null values. The issue is in
org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which
doesn't handle null values.
## How was this patch tested?
add new unit tests
Closes #24441 from uncleGen/SPARK-27494.
Authored-by: uncleGen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../kafka010/KafkaRecordToUnsafeRowConverter.scala | 7 ++-
.../sql/kafka010/KafkaContinuousSourceSuite.scala | 4 ++
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 58 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 1 deletion(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow
= {
rowWriter.reset()
+ rowWriter.zeroOutNullBytes()
if (record.key == null) {
rowWriter.setNullAt(0)
} else {
rowWriter.write(0, record.key)
}
- rowWriter.write(1, record.value)
+ if (record.value == null) {
+ rowWriter.setNullAt(1)
+ } else {
+ rowWriter.write(1, record.value)
+ }
rowWriter.write(2, UTF8String.fromString(record.topic))
rowWriter.write(3, record.partition)
rowWriter.write(4, record.offset)
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a0e5818..649cb72 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends
KafkaSourceSuiteBase with KafkaContinuo
}
}
}
+
+ test("SPARK-27494: read kafka record containing null key/values.") {
+ testNullableKeyValue(ContinuousTrigger(100))
+ }
}
class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 34cf335..da92019 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -988,6 +988,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
q.stop()
}
}
+
+ test("SPARK-27494: read kafka record containing null key/values.") {
+ testNullableKeyValue(Trigger.ProcessingTime(100))
+ }
}
@@ -1461,6 +1465,60 @@ abstract class KafkaSourceSuiteBase extends
KafkaSourceTest {
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
)
}
+
+ protected def testNullableKeyValue(trigger: Trigger): Unit = {
+ val table = "kafka_null_key_value_source_test"
+ withTable(table) {
+ val topic = newTopic()
+ testUtils.createTopic(topic)
+ testUtils.withTranscationalProducer { producer =>
+ val df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.isolation.level", "read_committed")
+ .option("startingOffsets", "earliest")
+ .option("subscribe", topic)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val q = df
+ .writeStream
+ .format("memory")
+ .queryName(table)
+ .trigger(trigger)
+ .start()
+ try {
+ var idx = 0
+ producer.beginTransaction()
+ val expected1 = Seq.tabulate(5) { _ =>
+ producer.send(new ProducerRecord[String, String](topic, null,
null)).get()
+ (null, null)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ val expected2 = Seq.tabulate(5) { _ =>
+ idx += 1
+ producer.send(new ProducerRecord[String, String](topic,
idx.toString, null)).get()
+ (idx.toString, null)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ val expected3 = Seq.tabulate(5) { _ =>
+ idx += 1
+ producer.send(new ProducerRecord[String, String](topic, null,
idx.toString)).get()
+ (null, idx.toString)
+ }.asInstanceOf[Seq[(String, String)]]
+
+ producer.commitTransaction()
+ eventually(timeout(streamingTimeout)) {
+ checkAnswer(spark.table(table), (expected1 ++ expected2 ++
expected3).toDF())
+ }
+ } finally {
+ q.stop()
+ }
+ }
+ }
+ }
}
object KafkaSourceSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]