Repository: spark Updated Branches: refs/heads/branch-2.1 6696ad0e8 -> 5131b0a96
[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans ## What changes were proposed in this pull request? We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka. ## How was this patch tested? New unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bill Chambers <[email protected]> Closes #17804 from anabranch/SPARK-20496-2. (cherry picked from commit 733b81b835f952ab96723c749461d6afc0c71974) Signed-off-by: Burak Yavuz <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5131b0a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5131b0a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5131b0a9 Branch: refs/heads/branch-2.1 Commit: 5131b0a963699b6910949ae2361fa740dafdb678 Parents: 6696ad0 Author: Bill Chambers <[email protected]> Authored: Fri Apr 28 10:18:31 2017 -0700 Committer: Burak Yavuz <[email protected]> Committed: Fri Apr 28 10:19:14 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/kafka010/KafkaWriter.scala | 4 ++-- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index a637d52..61936e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic == None) { throw new AnalysisException(s"topic option required when no " + @@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { - val schema = queryExecution.logical.output + val schema = queryExecution.analyzed.output validateQuery(queryExecution, kafkaParameters, topic) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { queryExecution.toRdd.foreachPartition { iter => http://git-wip-us.apache.org/repos/asf/spark/blob/5131b0a9/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 4905356..1e7f4f2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, DataType} @@ -107,6 +108,21 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext { s"save mode overwrite not allowed for kafka")) } + test("SPARK-20496: batch - enforce analyzed plans") { + val inputEvents = + spark.range(1, 1000) + .select(to_json(struct("*")) as 'value) + + val topic = newTopic() + testUtils.createTopic(topic) + // used to throw UnresolvedException + inputEvents.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + } + test("streaming - write to kafka with topic field") { val input = MemoryStream[String] val topic = newTopic() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
