Repository: carbondata Updated Branches: refs/heads/master 6cb6f8380 -> d4f9003af
[CARBONDATA-2255] Rename the streaming examples optimize streaming examples This closes #2064 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d4f9003a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d4f9003a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d4f9003a Branch: refs/heads/master Commit: d4f9003af3593d30799b573ce729de1918b8c800 Parents: 6cb6f83 Author: QiangCai <[email protected]> Authored: Wed Mar 14 17:42:39 2018 +0800 Committer: chenliang613 <[email protected]> Committed: Thu Mar 15 15:33:00 2018 +0800 ---------------------------------------------------------------------- .../CarbonBatchSparkStreamingExample.scala | 207 ------------------ .../CarbonStreamSparkStreamingExample.scala | 213 ------------------ .../CarbonStructuredStreamingExample.scala | 200 ----------------- ...CarbonStructuredStreamingWithRowParser.scala | 216 ------------------- .../examples/SparkStreamingExample.scala | 213 ++++++++++++++++++ .../StreamingUsingBatchLoadExample.scala | 207 ++++++++++++++++++ .../StreamingWithRowParserExample.scala | 216 +++++++++++++++++++ .../examples/StructuredStreamingExample.scala | 200 +++++++++++++++++ 8 files changed, 836 insertions(+), 836 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala deleted file mode 100644 index bcbf190..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.{File, PrintWriter} -import java.net.ServerSocket - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} -import org.apache.spark.streaming.{Seconds, StreamingContext, Time} - -/** - * This example introduces how to use CarbonData batch load to integrate - * with Spark Streaming(it's DStream, not Spark Structured Streaming) - */ -// scalastyle:off println - -case class DStreamData(id: Int, name: String, city: String, salary: Float) - -object CarbonBatchSparkStreamingExample { - - def main(args: Array[String]): Unit = { - - // setup paths - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val checkpointPath = - s"$rootPath/examples/spark2/target/spark_streaming_cp_" + - System.currentTimeMillis().toString() - val streamTableName = s"dstream_batch_table" - - val spark = ExampleUtils.createCarbonSession("CarbonBatchSparkStreamingExample", 4) - - val requireCreateTable = true - - if (requireCreateTable) { - // drop table if exists previously - spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") - // Create target carbon table and populate with initial data - // set AUTO_LOAD_MERGE to true to compact segment automatically - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'sort_columns'='name', - | 'dictionary_include'='city', - | 'AUTO_LOAD_MERGE'='true', - | 'COMPACTION_LEVEL_THRESHOLD'='4,10') - | """.stripMargin) - - val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) - // batch load - val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" - spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE $streamTableName - | OPTIONS('HEADER'='true') - """.stripMargin) - - // streaming ingest - val serverSocket = new ServerSocket(7071) - val thread1 = writeSocket(serverSocket) - val thread2 = showTableCount(spark, streamTableName) - val ssc = startStreaming(spark, streamTableName, checkpointPath) - // wait for stop signal to stop Spark Streaming App - waitForStopSignal(ssc) - // it need to start Spark Streaming App in main thread - // otherwise it will encounter an not-serializable exception. - ssc.start() - ssc.awaitTermination() - thread1.interrupt() - thread2.interrupt() - serverSocket.close() - } - - spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) - - spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) - - // record(id = 100000001) comes from batch segment_0 - // record(id = 1) comes from stream segment_1 - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) - - // not filter - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id < 10 limit 100").show(100, truncate = false) - - // show segments - spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) - - // drop table - spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") - - spark.stop() - System.out.println("streaming finished") - } - - def showTableCount(spark: SparkSession, tableName: String): Thread = { - val thread = new Thread() { - override def run(): Unit = { - for (_ <- 0 to 1000) { - spark.sql(s"select count(*) from $tableName").show(truncate = false) - spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false) - Thread.sleep(1000 * 5) - } - } - } - thread.start() - thread - } - - def waitForStopSignal(ssc: StreamingContext): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App - new ServerSocket(7072).accept() - // don't stop SparkContext here - ssc.stop(false, true) - } - } - thread.start() - thread - } - - def startStreaming(spark: SparkSession, tableName: String, - checkpointPath: String): StreamingContext = { - var ssc: StreamingContext = null - try { - // recommend: the batch interval must set larger, such as 30s, 1min. - ssc = new StreamingContext(spark.sparkContext, Seconds(15)) - ssc.checkpoint(checkpointPath) - - val readSocketDF = ssc.socketTextStream("localhost", 7071) - - val batchData = readSocketDF - .map(_.split(",")) - .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) - - batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { - val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary") - println("at time: " + time.toString() + " the count of received data: " + df.count()) - df.write - .format("carbondata") - .option("tableName", tableName) - .mode(SaveMode.Append) - .save() - }} - } catch { - case ex: Exception => - ex.printStackTrace() - println("Done reading and writing streaming data") - } - ssc - } - - def writeSocket(serverSocket: ServerSocket): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // wait for client to connection request and accept - val clientSocket = serverSocket.accept() - val socketWriter = new PrintWriter(clientSocket.getOutputStream()) - var index = 0 - for (_ <- 1 to 1000) { - // write 5 records per iteration - for (_ <- 0 to 1000) { - index = index + 1 - socketWriter.println(index.toString + ",name_" + index - + ",city_" + index + "," + (index * 10000.00).toString + - ",school_" + index + ":school_" + index + index + "$" + index) - } - socketWriter.flush() - Thread.sleep(1000) - } - socketWriter.close() - System.out.println("Socket closed") - } - } - thread.start() - thread - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala deleted file mode 100644 index 856084b..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.{File, PrintWriter} -import java.net.ServerSocket - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.CarbonEnv -import org.apache.spark.sql.CarbonSparkStreamingFactory -import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.{Seconds, StreamingContext, Time} - -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.streaming.CarbonSparkStreamingListener -import org.apache.carbondata.streaming.parser.CarbonStreamParser - -/** - * This example introduces how to use Spark Streaming to write data - * to CarbonData stream table. - * - * NOTE: Current integration with Spark Streaming is an alpha feature. - */ -// scalastyle:off println -object CarbonStreamSparkStreamingExample { - - def main(args: Array[String]): Unit = { - - // setup paths - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - val checkpointPath = - s"$rootPath/examples/spark2/target/spark_streaming_cp_" + - System.currentTimeMillis().toString() - val streamTableName = s"dstream_stream_table" - - val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4) - - val requireCreateTable = true - - if (requireCreateTable) { - // drop table if exists previously - spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") - // Create target carbon table and populate with initial data - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'streaming'='true', - | 'sort_columns'='name', - | 'dictionary_include'='city') - | """.stripMargin) - val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) - // batch load - val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" - spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE $streamTableName - | OPTIONS('HEADER'='true') - """.stripMargin) - - // streaming ingest - val serverSocket = new ServerSocket(7071) - val thread1 = writeSocket(serverSocket) - val thread2 = showTableCount(spark, streamTableName) - val ssc = startStreaming(spark, streamTableName, checkpointPath) - // add a Spark Streaming Listener to remove all lock for stream tables when stop app - ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener()) - // wait for stop signal to stop Spark Streaming App - waitForStopSignal(ssc) - // it need to start Spark Streaming App in main thread - // otherwise it will encounter an not-serializable exception. - ssc.start() - ssc.awaitTermination() - thread1.interrupt() - thread2.interrupt() - serverSocket.close() - } - - spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) - - spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false) - - // record(id = 100000001) comes from batch segment_0 - // record(id = 1) comes from stream segment_1 - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) - - // not filter - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id < 10 limit 100").show(100, truncate = false) - - // show segments - spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) - - spark.stop() - System.out.println("streaming finished") - } - - def showTableCount(spark: SparkSession, tableName: String): Thread = { - val thread = new Thread() { - override def run(): Unit = { - for (_ <- 0 to 1000) { - println(System.currentTimeMillis()) - spark.sql(s"select count(*) from $tableName").show(truncate = false) - spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false) - Thread.sleep(1000 * 5) - } - } - } - thread.start() - thread - } - - def waitForStopSignal(ssc: StreamingContext): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App - new ServerSocket(7072).accept() - // don't stop SparkContext here - ssc.stop(false, true) - } - } - thread.start() - thread - } - - def startStreaming(spark: SparkSession, tableName: String, - checkpointPath: String): StreamingContext = { - var ssc: StreamingContext = null - try { - // recommend: the batch interval must set larger, such as 30s, 1min. - ssc = new StreamingContext(spark.sparkContext, Seconds(30)) - ssc.checkpoint(checkpointPath) - - val readSocketDF = ssc.socketTextStream("localhost", 7071) - - val batchData = readSocketDF - .map(_.split(",")) - .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) - - println("init carbon table info") - batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { - val df = spark.createDataFrame(rdd).toDF() - println(System.currentTimeMillis().toString() + - " at batch time: " + time.toString() + - " the count of received data: " + df.count()) - CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName) - .option(CarbonStreamParser.CARBON_STREAM_PARSER, - CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) - .mode(SaveMode.Append) - .writeStreamData(df, time) - }} - } catch { - case ex: Exception => - ex.printStackTrace() - println("Done reading and writing streaming data") - } - ssc - } - - def writeSocket(serverSocket: ServerSocket): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // wait for client to connection request and accept - val clientSocket = serverSocket.accept() - val socketWriter = new PrintWriter(clientSocket.getOutputStream()) - var index = 0 - for (_ <- 1 to 1000) { - // write 5 records per iteration - for (_ <- 0 to 100) { - index = index + 1 - socketWriter.println(index.toString + ",name_" + index - + ",city_" + index + "," + (index * 10000.00).toString + - ",school_" + index + ":school_" + index + index + "$" + index) - } - socketWriter.flush() - Thread.sleep(2000) - } - socketWriter.close() - System.out.println("Socket closed") - } - } - thread.start() - thread - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala deleted file mode 100644 index bc65b2f..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.{File, PrintWriter} -import java.net.ServerSocket - -import org.apache.spark.sql.{CarbonEnv, SparkSession} -import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} - -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonTablePath - -// scalastyle:off println -object CarbonStructuredStreamingExample { - def main(args: Array[String]) { - - // setup paths - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - - val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingExample", 4) - val streamTableName = s"stream_table" - - val requireCreateTable = true - val useComplexDataType = false - - if (requireCreateTable) { - // drop table if exists previously - spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") - // Create target carbon table and populate with initial data - if (useComplexDataType) { - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT, - | file struct<school:array<string>, age:int> - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') - | """.stripMargin) - } else { - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'streaming'='true', 'sort_columns'='name') - | """.stripMargin) - } - - val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) - // batch load - val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" - spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE $streamTableName - | OPTIONS('HEADER'='true') - """.stripMargin) - - // streaming ingest - val serverSocket = new ServerSocket(7071) - val thread1 = startStreaming(spark, carbonTable) - val thread2 = writeSocket(serverSocket) - val thread3 = showTableCount(spark, streamTableName) - - System.out.println("type enter to interrupt streaming") - System.in.read() - thread1.interrupt() - thread2.interrupt() - thread3.interrupt() - serverSocket.close() - } - - spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) - - spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) - - // record(id = 100000001) comes from batch segment_0 - // record(id = 1) comes from stream segment_1 - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) - - // not filter - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id < 10 limit 100").show(100, truncate = false) - - if (useComplexDataType) { - // complex - spark.sql(s"select file.age, file.school " + - s"from ${ streamTableName } " + - s"where where file.age = 30 ").show(100, truncate = false) - } - - spark.stop() - System.out.println("streaming finished") - } - - def showTableCount(spark: SparkSession, tableName: String): Thread = { - val thread = new Thread() { - override def run(): Unit = { - for (_ <- 0 to 1000) { - spark.sql(s"select count(*) from $tableName").show(truncate = false) - Thread.sleep(1000 * 3) - } - } - } - thread.start() - thread - } - - def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = { - val thread = new Thread() { - override def run(): Unit = { - var qry: StreamingQuery = null - try { - val readSocketDF = spark.readStream - .format("socket") - .option("host", "localhost") - .option("port", 7071) - .load() - - // Write data from socket stream to carbondata file - qry = readSocketDF.writeStream - .format("carbondata") - .trigger(ProcessingTime("5 seconds")) - .option("checkpointLocation", - CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) - .option("dbName", "default") - .option("tableName", "stream_table") - .start() - - qry.awaitTermination() - } catch { - case ex: Exception => - ex.printStackTrace() - println("Done reading and writing streaming data") - } finally { - qry.stop() - } - } - } - thread.start() - thread - } - - def writeSocket(serverSocket: ServerSocket): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // wait for client to connection request and accept - val clientSocket = serverSocket.accept() - val socketWriter = new PrintWriter(clientSocket.getOutputStream()) - var index = 0 - for (_ <- 1 to 1000) { - // write 5 records per iteration - for (_ <- 0 to 1000) { - index = index + 1 - socketWriter.println(index.toString + ",name_" + index - + ",city_" + index + "," + (index * 10000.00).toString + - ",school_" + index + ":school_" + index + index + "$" + index) - } - socketWriter.flush() - Thread.sleep(1000) - } - socketWriter.close() - System.out.println("Socket closed") - } - } - thread.start() - thread - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala deleted file mode 100644 index 9ca0e07..0000000 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.{File, PrintWriter} -import java.net.ServerSocket - -import org.apache.spark.sql.{CarbonEnv, SparkSession} -import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} - -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.streaming.parser.CarbonStreamParser - -case class FileElement(school: Array[String], age: Int) -case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement) - -// scalastyle:off println -object CarbonStructuredStreamingWithRowParser { - def main(args: Array[String]) { - - // setup paths - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath - - val spark = ExampleUtils.createCarbonSession("CarbonStructuredStreamingWithRowParser", 4) - val streamTableName = s"stream_table_with_row_parser" - - val requireCreateTable = true - val useComplexDataType = false - - if (requireCreateTable) { - // drop table if exists previously - spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") - // Create target carbon table and populate with initial data - if (useComplexDataType) { - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT, - | file struct<school:array<string>, age:int> - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') - | """.stripMargin) - } else { - spark.sql( - s""" - | CREATE TABLE ${ streamTableName }( - | id INT, - | name STRING, - | city STRING, - | salary FLOAT - | ) - | STORED BY 'carbondata' - | TBLPROPERTIES( - | 'streaming'='true', 'sort_columns'='name') - | """.stripMargin) - } - - val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) - // batch load - val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" - spark.sql( - s""" - | LOAD DATA LOCAL INPATH '$path' - | INTO TABLE $streamTableName - | OPTIONS('HEADER'='true') - """.stripMargin) - - // streaming ingest - val serverSocket = new ServerSocket(7071) - val thread1 = startStreaming(spark, carbonTable.getTablePath) - val thread2 = writeSocket(serverSocket) - val thread3 = showTableCount(spark, streamTableName) - - System.out.println("type enter to interrupt streaming") - System.in.read() - thread1.interrupt() - thread2.interrupt() - thread3.interrupt() - serverSocket.close() - } - - spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) - - spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) - - // record(id = 100000001) comes from batch segment_0 - // record(id = 1) comes from stream segment_1 - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) - - // not filter - spark.sql(s"select * " + - s"from ${ streamTableName } " + - s"where id < 10 limit 100").show(100, truncate = false) - - if (useComplexDataType) { - // complex - spark.sql(s"select file.age, file.school " + - s"from ${ streamTableName } " + - s"where where file.age = 30 ").show(100, truncate = false) - } - - spark.stop() - System.out.println("streaming finished") - } - - def showTableCount(spark: SparkSession, tableName: String): Thread = { - val thread = new Thread() { - override def run(): Unit = { - for (_ <- 0 to 1000) { - spark.sql(s"select count(*) from $tableName").show(truncate = false) - Thread.sleep(1000 * 3) - } - } - } - thread.start() - thread - } - - def startStreaming(spark: SparkSession, tablePath: String): Thread = { - val thread = new Thread() { - override def run(): Unit = { - var qry: StreamingQuery = null - try { - import spark.implicits._ - val readSocketDF = spark.readStream - .format("socket") - .option("host", "localhost") - .option("port", 7071) - .load() - .as[String] - .map(_.split(",")) - .map { fields => { - val tmp = fields(4).split("\\$") - val file = FileElement(tmp(0).split(":"), tmp(1).toInt) - if (fields(0).toInt % 2 == 0) { - StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file) - } else { - StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file) - } - } } - - // Write data from socket stream to carbondata file - qry = readSocketDF.writeStream - .format("carbondata") - .trigger(ProcessingTime("5 seconds")) - .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath)) - .option("dbName", "default") - .option("tableName", "stream_table_with_row_parser") - .option(CarbonStreamParser.CARBON_STREAM_PARSER, - CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) - .start() - - qry.awaitTermination() - } catch { - case ex: Exception => - ex.printStackTrace() - println("Done reading and writing streaming data") - } finally { - qry.stop() - } - } - } - thread.start() - thread - } - - def writeSocket(serverSocket: ServerSocket): Thread = { - val thread = new Thread() { - override def run(): Unit = { - // wait for client to connection request and accept - val clientSocket = serverSocket.accept() - val socketWriter = new PrintWriter(clientSocket.getOutputStream()) - var index = 0 - for (_ <- 1 to 1000) { - // write 5 records per iteration - for (_ <- 0 to 1000) { - index = index + 1 - socketWriter.println(index.toString + ",name_" + index - + ",city_" + index + "," + (index * 10000.00).toString + - ",school_" + index + ":school_" + index + index + "$" + index) - } - socketWriter.flush() - Thread.sleep(1000) - } - socketWriter.close() - System.out.println("Socket closed") - } - } - thread.start() - thread - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala new file mode 100644 index 0000000..d819a3f --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.CarbonSparkStreamingFactory +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.CarbonSparkStreamingListener +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +/** + * This example introduces how to use Spark Streaming to write data + * to CarbonData stream table. + * + * NOTE: Current integration with Spark Streaming is an alpha feature. + */ +// scalastyle:off println +object SparkStreamingExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_stream_table" + + val spark = ExampleUtils.createCarbonSession("SparkStreamingExample", 4) + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', + | 'sort_columns'='name', + | 'dictionary_include'='city') + | """.stripMargin) + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, streamTableName) + val ssc = startStreaming(spark, streamTableName, checkpointPath) + // add a Spark Streaming Listener to remove all lock for stream tables when stop app + ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener()) + // wait for stop signal to stop Spark Streaming App + waitForStopSignal(ssc) + // it need to start Spark Streaming App in main thread + // otherwise it will encounter an not-serializable exception. + ssc.start() + ssc.awaitTermination() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + for (_ <- 0 to 1000) { + println(System.currentTimeMillis()) + spark.sql(s"select count(*) from $tableName").show(truncate = false) + spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false) + Thread.sleep(1000 * 5) + } + } + } + thread.start() + thread + } + + def waitForStopSignal(ssc: StreamingContext): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App + new ServerSocket(7072).accept() + // don't stop SparkContext here + ssc.stop(false, true) + } + } + thread.start() + thread + } + + def startStreaming(spark: SparkSession, tableName: String, + checkpointPath: String): StreamingContext = { + var ssc: StreamingContext = null + try { + // recommend: the batch interval must set larger, such as 30s, 1min. + ssc = new StreamingContext(spark.sparkContext, Seconds(30)) + ssc.checkpoint(checkpointPath) + + val readSocketDF = ssc.socketTextStream("localhost", 7071) + + val batchData = readSocketDF + .map(_.split(",")) + .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) + + println("init carbon table info") + batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { + val df = spark.createDataFrame(rdd).toDF() + println(System.currentTimeMillis().toString() + + " at batch time: " + time.toString() + + " the count of received data: " + df.count()) + CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName) + .option(CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) + .mode(SaveMode.Append) + .writeStreamData(df, time) + }} + } catch { + case ex: Exception => + ex.printStackTrace() + println("Done reading and writing streaming data") + } + ssc + } + + def writeSocket(serverSocket: ServerSocket): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to 1000) { + // write 5 records per iteration + for (_ <- 0 to 100) { + index = index + 1 + socketWriter.println(index.toString + ",name_" + index + + ",city_" + index + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + } + socketWriter.flush() + Thread.sleep(2000) + } + socketWriter.close() + System.out.println("Socket closed") + } + } + thread.start() + thread + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala new file mode 100644 index 0000000..89883f8 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession} +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +/** + * This example introduces how to use CarbonData batch load to integrate + * with Spark Streaming(it's DStream, not Spark Structured Streaming) + */ +// scalastyle:off println + +case class DStreamData(id: Int, name: String, city: String, salary: Float) + +object StreamingUsingBatchLoadExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_batch_table" + + val spark = ExampleUtils.createCarbonSession("StreamingUsingBatchLoadExample", 4) + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + // set AUTO_LOAD_MERGE to true to compact segment automatically + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'sort_columns'='name', + | 'dictionary_include'='city', + | 'AUTO_LOAD_MERGE'='true', + | 'COMPACTION_LEVEL_THRESHOLD'='4,10') + | """.stripMargin) + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, streamTableName) + val ssc = startStreaming(spark, streamTableName, checkpointPath) + // wait for stop signal to stop Spark Streaming App + waitForStopSignal(ssc) + // it need to start Spark Streaming App in main thread + // otherwise it will encounter an not-serializable exception. + ssc.start() + ssc.awaitTermination() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) + + // drop table + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + for (_ <- 0 to 1000) { + spark.sql(s"select count(*) from $tableName").show(truncate = false) + spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false) + Thread.sleep(1000 * 5) + } + } + } + thread.start() + thread + } + + def waitForStopSignal(ssc: StreamingContext): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App + new ServerSocket(7072).accept() + // don't stop SparkContext here + ssc.stop(false, true) + } + } + thread.start() + thread + } + + def startStreaming(spark: SparkSession, tableName: String, + checkpointPath: String): StreamingContext = { + var ssc: StreamingContext = null + try { + // recommend: the batch interval must set larger, such as 30s, 1min. + ssc = new StreamingContext(spark.sparkContext, Seconds(15)) + ssc.checkpoint(checkpointPath) + + val readSocketDF = ssc.socketTextStream("localhost", 7071) + + val batchData = readSocketDF + .map(_.split(",")) + .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) + + batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { + val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary") + println("at time: " + time.toString() + " the count of received data: " + df.count()) + df.write + .format("carbondata") + .option("tableName", tableName) + .mode(SaveMode.Append) + .save() + }} + } catch { + case ex: Exception => + ex.printStackTrace() + println("Done reading and writing streaming data") + } + ssc + } + + def writeSocket(serverSocket: ServerSocket): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to 1000) { + // write 5 records per iteration + for (_ <- 0 to 1000) { + index = index + 1 + socketWriter.println(index.toString + ",name_" + index + + ",city_" + index + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + } + socketWriter.flush() + Thread.sleep(1000) + } + socketWriter.close() + System.out.println("Socket closed") + } + } + thread.start() + thread + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala new file mode 100644 index 0000000..a07d504 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} + +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +case class FileElement(school: Array[String], age: Int) +case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement) + +// scalastyle:off println +object StreamingWithRowParserExample { + def main(args: Array[String]) { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val spark = ExampleUtils.createCarbonSession("StreamingWithRowParserExample", 4) + val streamTableName = s"stream_table_with_row_parser" + + val requireCreateTable = true + val useComplexDataType = false + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + if (useComplexDataType) { + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file struct<school:array<string>, age:int> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') + | """.stripMargin) + } else { + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='name') + | """.stripMargin) + } + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = startStreaming(spark, carbonTable.getTablePath) + val thread2 = writeSocket(serverSocket) + val thread3 = showTableCount(spark, streamTableName) + + System.out.println("type enter to interrupt streaming") + System.in.read() + thread1.interrupt() + thread2.interrupt() + thread3.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + if (useComplexDataType) { + // complex + spark.sql(s"select file.age, file.school " + + s"from ${ streamTableName } " + + s"where where file.age = 30 ").show(100, truncate = false) + } + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + for (_ <- 0 to 1000) { + spark.sql(s"select count(*) from $tableName").show(truncate = false) + Thread.sleep(1000 * 3) + } + } + } + thread.start() + thread + } + + def startStreaming(spark: SparkSession, tablePath: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + var qry: StreamingQuery = null + try { + import spark.implicits._ + val readSocketDF = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", 7071) + .load() + .as[String] + .map(_.split(",")) + .map { fields => { + val tmp = fields(4).split("\\$") + val file = FileElement(tmp(0).split(":"), tmp(1).toInt) + if (fields(0).toInt % 2 == 0) { + StreamData(fields(0).toInt, null, fields(2), fields(3).toFloat, file) + } else { + StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file) + } + } } + + // Write data from socket stream to carbondata file + qry = readSocketDF.writeStream + .format("carbondata") + .trigger(ProcessingTime("5 seconds")) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath)) + .option("dbName", "default") + .option("tableName", "stream_table_with_row_parser") + .option(CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) + .start() + + qry.awaitTermination() + } catch { + case ex: Exception => + ex.printStackTrace() + println("Done reading and writing streaming data") + } finally { + qry.stop() + } + } + } + thread.start() + thread + } + + def writeSocket(serverSocket: ServerSocket): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to 1000) { + // write 5 records per iteration + for (_ <- 0 to 1000) { + index = index + 1 + socketWriter.println(index.toString + ",name_" + index + + ",city_" + index + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + } + socketWriter.flush() + Thread.sleep(1000) + } + socketWriter.close() + System.out.println("Socket closed") + } + } + thread.start() + thread + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala new file mode 100644 index 0000000..1d4bedf --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonTablePath + +// scalastyle:off println +object StructuredStreamingExample { + def main(args: Array[String]) { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4) + val streamTableName = s"stream_table" + + val requireCreateTable = true + val useComplexDataType = false + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + if (useComplexDataType) { + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file struct<school:array<string>, age:int> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='name', 'dictionary_include'='city') + | """.stripMargin) + } else { + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='name') + | """.stripMargin) + } + + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = startStreaming(spark, carbonTable) + val thread2 = writeSocket(serverSocket) + val thread3 = showTableCount(spark, streamTableName) + + System.out.println("type enter to interrupt streaming") + System.in.read() + thread1.interrupt() + thread2.interrupt() + thread3.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName }").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + if (useComplexDataType) { + // complex + spark.sql(s"select file.age, file.school " + + s"from ${ streamTableName } " + + s"where where file.age = 30 ").show(100, truncate = false) + } + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + for (_ <- 0 to 1000) { + spark.sql(s"select count(*) from $tableName").show(truncate = false) + Thread.sleep(1000 * 3) + } + } + } + thread.start() + thread + } + + def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = { + val thread = new Thread() { + override def run(): Unit = { + var qry: StreamingQuery = null + try { + val readSocketDF = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", 7071) + .load() + + // Write data from socket stream to carbondata file + qry = readSocketDF.writeStream + .format("carbondata") + .trigger(ProcessingTime("5 seconds")) + .option("checkpointLocation", + CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) + .option("dbName", "default") + .option("tableName", "stream_table") + .start() + + qry.awaitTermination() + } catch { + case ex: Exception => + ex.printStackTrace() + println("Done reading and writing streaming data") + } finally { + qry.stop() + } + } + } + thread.start() + thread + } + + def writeSocket(serverSocket: ServerSocket): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to 1000) { + // write 5 records per iteration + for (_ <- 0 to 1000) { + index = index + 1 + socketWriter.println(index.toString + ",name_" + index + + ",city_" + index + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + } + socketWriter.flush() + Thread.sleep(1000) + } + socketWriter.close() + System.out.println("Socket closed") + } + } + thread.start() + thread + } +} +// scalastyle:on println
