Repository: spark Updated Branches: refs/heads/master ca04c3ff2 -> 8c6a9c90a
[SPARK-23279][SS] Avoid triggering distributed job for Console sink ## What changes were proposed in this pull request? Console sink will redistribute collected local data and trigger a distributed job in each batch, this is not necessary, so here change to local job. ## How was this patch tested? Existing UT and manual verification. Author: jerryshao <[email protected]> Closes #20447 from jerryshao/console-minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c6a9c90 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c6a9c90 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c6a9c90 Branch: refs/heads/master Commit: 8c6a9c90a36a938372f28ee8be72178192fbc313 Parents: ca04c3f Author: jerryshao <[email protected]> Authored: Wed Jan 31 13:59:21 2018 +0800 Committer: jerryshao <[email protected]> Committed: Wed Jan 31 13:59:21 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/sources/ConsoleWriter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8c6a9c90/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala index d46f4d7..c57bdc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming.sources +import scala.collection.JavaConverters._ + import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.sources.v2.DataSourceOptions @@ -61,7 +63,7 @@ class ConsoleWriter(schema: StructType, options: DataSourceOptions) println("-------------------------------------------") // scalastyle:off println spark - .createDataFrame(spark.sparkContext.parallelize(rows), schema) + .createDataFrame(rows.toList.asJava, schema) .show(numRowsToShow, isTruncated) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
