Repository: carbondata Updated Branches: refs/heads/master 59eff88b0 -> e36257fd2
[CARBONDATA-1848] Carbondata streaming sink adapt spark 2.2 Carbondata streaming sink adapt spark 2.2 This closes #1611 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e36257fd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e36257fd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e36257fd Branch: refs/heads/master Commit: e36257fd2548c86a95743ccc1096991f7be67d06 Parents: 59eff88 Author: QiangCai <[email protected]> Authored: Tue Dec 5 15:31:05 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 6 22:51:36 2017 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/examples/StreamExample.scala | 3 +-- .../spark/carbondata/TestStreamingTableOperation.scala | 2 -- .../streaming/CarbonStreamingQueryListener.scala | 12 +++++++++++- 3 files changed, 12 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala index 77c20bd..b59e960 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala @@ -21,7 +21,6 @@ import java.io.{File, PrintWriter} import java.net.ServerSocket import org.apache.spark.sql.{CarbonEnv, SparkSession} -import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -169,7 +168,7 @@ object StreamExample { .format("carbondata") .trigger(ProcessingTime("5 seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) - .option("tablePath", tablePath.getPath) + .option("dbName", "default") .option("tableName", "stream_table") .start() http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index de9d61f..75dcbdf 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -799,7 +799,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) - .option("tablePath", tablePath.getPath) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -916,7 +915,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .format("carbondata") .trigger(ProcessingTime(s"${ intervalSecond } seconds")) .option("checkpointLocation", tablePath.getStreamingCheckpointDir) - .option("tablePath", tablePath.getPath) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) .start() http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala index c2789f4..07ef8ca 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util import java.util.UUID +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.StreamingQueryListener @@ -33,7 +34,16 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi private val cache = new util.HashMap[UUID, ICarbonLock]() override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { - val qry = spark.streams.get(event.id).asInstanceOf[StreamExecution] + val streamQuery = spark.streams.get(event.id) + val qry = if (SPARK_VERSION.startsWith("2.1")) { + // adapt spark 2.1 + streamQuery.asInstanceOf[StreamExecution] + } else { + // adapt spark 2.2 and later version + val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper") + val method = clazz.getMethod("streamingQuery") + method.invoke(streamQuery).asInstanceOf[StreamExecution] + } if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) { LOGGER.info("Carbon streaming query started: " + event.id) val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
