Repository: carbondata
Updated Branches:
  refs/heads/master 6cb6f8380 -> d4f9003af


[CARBONDATA-2255] Rename the streaming examples

optimize streaming examples

This closes #2064


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d4f9003a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d4f9003a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d4f9003a

Branch: refs/heads/master
Commit: d4f9003af3593d30799b573ce729de1918b8c800
Parents: 6cb6f83
Author: QiangCai <[email protected]>
Authored: Wed Mar 14 17:42:39 2018 +0800
Committer: chenliang613 <[email protected]>
Committed: Thu Mar 15 15:33:00 2018 +0800

----------------------------------------------------------------------
 .../CarbonBatchSparkStreamingExample.scala      | 207 ------------------
 .../CarbonStreamSparkStreamingExample.scala     | 213 ------------------
 .../CarbonStructuredStreamingExample.scala      | 200 -----------------
 ...CarbonStructuredStreamingWithRowParser.scala | 216 -------------------
 .../examples/SparkStreamingExample.scala        | 213 ++++++++++++++++++
 .../StreamingUsingBatchLoadExample.scala        | 207 ++++++++++++++++++
 .../StreamingWithRowParserExample.scala         | 216 +++++++++++++++++++
 .../examples/StructuredStreamingExample.scala   | 200 +++++++++++++++++
 8 files changed, 836 insertions(+), 836 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
deleted file mode 100644
index bcbf190..0000000
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-/**
- * This example introduces how to use CarbonData batch load to integrate
- * with Spark Streaming(it's DStream, not Spark Structured Streaming)
- */
-// scalastyle:off println
-
-case class DStreamData(id: Int, name: String, city: String, salary: Float)
-
-object CarbonBatchSparkStreamingExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val checkpointPath =
-      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
-      System.currentTimeMillis().toString()
-    val streamTableName = s"dstream_batch_table"
-
-    val spark = 
ExampleUtils.createCarbonSession("CarbonBatchSparkStreamingExample", 4)
-
-    val requireCreateTable = true
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      // set AUTO_LOAD_MERGE to true to compact segment automatically
-      spark.sql(
-        s"""
-           | CREATE TABLE ${ streamTableName }(
-           | id INT,
-           | name STRING,
-           | city STRING,
-           | salary FLOAT
-           | )
-           | STORED BY 'carbondata'
-           | TBLPROPERTIES(
-           | 'sort_columns'='name',
-           | 'dictionary_include'='city',
-           | 'AUTO_LOAD_MERGE'='true',
-           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
-           | """.stripMargin)
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      // batch load
-      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = writeSocket(serverSocket)
-      val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, checkpointPath)
-      // wait for stop signal to stop Spark Streaming App
-      waitForStopSignal(ssc)
-      // it need to start Spark Streaming App in main thread
-      // otherwise it will encounter an not-serializable exception.
-      ssc.start()
-      ssc.awaitTermination()
-      thread1.interrupt()
-      thread2.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    // show segments
-    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
-    // drop table
-    spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
-          Thread.sleep(1000 * 5)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def waitForStopSignal(ssc: StreamingContext): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
-        new ServerSocket(7072).accept()
-        // don't stop SparkContext here
-        ssc.stop(false, true)
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tableName: String,
-      checkpointPath: String): StreamingContext = {
-    var ssc: StreamingContext = null
-    try {
-      // recommend: the batch interval must set larger, such as 30s, 1min.
-      ssc = new StreamingContext(spark.sparkContext, Seconds(15))
-      ssc.checkpoint(checkpointPath)
-
-      val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
-      val batchData = readSocketDF
-        .map(_.split(","))
-        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat))
-
-      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
-        val df = spark.createDataFrame(rdd).toDF("id", "name", "city", 
"salary")
-        println("at time: " + time.toString() + " the count of received data: 
" + df.count())
-        df.write
-          .format("carbondata")
-          .option("tableName", tableName)
-          .mode(SaveMode.Append)
-          .save()
-      }}
-    } catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        println("Done reading and writing streaming data")
-    }
-    ssc
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 
10000.00).toString +
-                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
deleted file mode 100644
index 856084b..0000000
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.CarbonSparkStreamingFactory
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.CarbonSparkStreamingListener
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-/**
- * This example introduces how to use Spark Streaming to write data
- * to CarbonData stream table.
- *
- * NOTE: Current integration with Spark Streaming is an alpha feature.
- */
-// scalastyle:off println
-object CarbonStreamSparkStreamingExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-    val checkpointPath =
-      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
-      System.currentTimeMillis().toString()
-    val streamTableName = s"dstream_stream_table"
-
-    val spark = 
ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4)
-
-    val requireCreateTable = true
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      spark.sql(
-        s"""
-           | CREATE TABLE ${ streamTableName }(
-           | id INT,
-           | name STRING,
-           | city STRING,
-           | salary FLOAT
-           | )
-           | STORED BY 'carbondata'
-           | TBLPROPERTIES(
-           | 'streaming'='true',
-           | 'sort_columns'='name',
-           | 'dictionary_include'='city')
-           | """.stripMargin)
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      // batch load
-      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = writeSocket(serverSocket)
-      val thread2 = showTableCount(spark, streamTableName)
-      val ssc = startStreaming(spark, streamTableName, checkpointPath)
-      // add a Spark Streaming Listener to remove all lock for stream tables 
when stop app
-      ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
-      // wait for stop signal to stop Spark Streaming App
-      waitForStopSignal(ssc)
-      // it need to start Spark Streaming App in main thread
-      // otherwise it will encounter an not-serializable exception.
-      ssc.start()
-      ssc.awaitTermination()
-      thread1.interrupt()
-      thread2.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
-
-    spark.sql(s"select * from ${ streamTableName } order by id 
desc").show(100, truncate = false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    // show segments
-    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          println(System.currentTimeMillis())
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
-          Thread.sleep(1000 * 5)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def waitForStopSignal(ssc: StreamingContext): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
-        new ServerSocket(7072).accept()
-        // don't stop SparkContext here
-        ssc.stop(false, true)
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tableName: String,
-      checkpointPath: String): StreamingContext = {
-    var ssc: StreamingContext = null
-    try {
-      // recommend: the batch interval must set larger, such as 30s, 1min.
-      ssc = new StreamingContext(spark.sparkContext, Seconds(30))
-      ssc.checkpoint(checkpointPath)
-
-      val readSocketDF = ssc.socketTextStream("localhost", 7071)
-
-      val batchData = readSocketDF
-        .map(_.split(","))
-        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat))
-
-      println("init carbon table info")
-      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
-        val df = spark.createDataFrame(rdd).toDF()
-        println(System.currentTimeMillis().toString() +
-          " at batch time: " + time.toString() +
-          " the count of received data: " + df.count())
-        CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, 
"default", tableName)
-          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
-          .mode(SaveMode.Append)
-          .writeStreamData(df, time)
-      }}
-    } catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        println("Done reading and writing streaming data")
-    }
-    ssc
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 100) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 
10000.00).toString +
-                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(2000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
deleted file mode 100644
index bc65b2f..0000000
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingExample.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-// scalastyle:off println
-object CarbonStructuredStreamingExample {
-  def main(args: Array[String]) {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-
-    val spark = 
ExampleUtils.createCarbonSession("CarbonStructuredStreamingExample", 4)
-    val streamTableName = s"stream_table"
-
-    val requireCreateTable = true
-    val useComplexDataType = false
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      if (useComplexDataType) {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT,
-             | file struct<school:array<string>, age:int>
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name', 
'dictionary_include'='city')
-             | """.stripMargin)
-      } else {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name')
-             | """.stripMargin)
-      }
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      // batch load
-      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, carbonTable)
-      val thread2 = writeSocket(serverSocket)
-      val thread3 = showTableCount(spark, streamTableName)
-
-      System.out.println("type enter to interrupt streaming")
-      System.in.read()
-      thread1.interrupt()
-      thread2.interrupt()
-      thread3.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    if (useComplexDataType) {
-      // complex
-      spark.sql(s"select file.age, file.school " +
-                s"from ${ streamTableName } " +
-                s"where where file.age = 30 ").show(100, truncate = false)
-    }
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          Thread.sleep(1000 * 3)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        var qry: StreamingQuery = null
-        try {
-          val readSocketDF = spark.readStream
-            .format("socket")
-            .option("host", "localhost")
-            .option("port", 7071)
-            .load()
-
-          // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
-            .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation",
-              
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
-            .option("dbName", "default")
-            .option("tableName", "stream_table")
-            .start()
-
-          qry.awaitTermination()
-        } catch {
-          case ex: Exception =>
-            ex.printStackTrace()
-            println("Done reading and writing streaming data")
-        } finally {
-          qry.stop()
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 
10000.00).toString +
-                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
deleted file mode 100644
index 9ca0e07..0000000
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.{File, PrintWriter}
-import java.net.ServerSocket
-
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-
-case class FileElement(school: Array[String], age: Int)
-case class StreamData(id: Int, name: String, city: String, salary: Float, 
file: FileElement)
-
-// scalastyle:off println
-object CarbonStructuredStreamingWithRowParser {
-  def main(args: Array[String]) {
-
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-
-    val spark = 
ExampleUtils.createCarbonSession("CarbonStructuredStreamingWithRowParser", 4)
-    val streamTableName = s"stream_table_with_row_parser"
-
-    val requireCreateTable = true
-    val useComplexDataType = false
-
-    if (requireCreateTable) {
-      // drop table if exists previously
-      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
-      // Create target carbon table and populate with initial data
-      if (useComplexDataType) {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT,
-             | file struct<school:array<string>, age:int>
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name', 
'dictionary_include'='city')
-             | """.stripMargin)
-      } else {
-        spark.sql(
-          s"""
-             | CREATE TABLE ${ streamTableName }(
-             | id INT,
-             | name STRING,
-             | city STRING,
-             | salary FLOAT
-             | )
-             | STORED BY 'carbondata'
-             | TBLPROPERTIES(
-             | 'streaming'='true', 'sort_columns'='name')
-             | """.stripMargin)
-      }
-
-      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
-      // batch load
-      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
-      spark.sql(
-        s"""
-           | LOAD DATA LOCAL INPATH '$path'
-           | INTO TABLE $streamTableName
-           | OPTIONS('HEADER'='true')
-         """.stripMargin)
-
-      // streaming ingest
-      val serverSocket = new ServerSocket(7071)
-      val thread1 = startStreaming(spark, carbonTable.getTablePath)
-      val thread2 = writeSocket(serverSocket)
-      val thread3 = showTableCount(spark, streamTableName)
-
-      System.out.println("type enter to interrupt streaming")
-      System.in.read()
-      thread1.interrupt()
-      thread2.interrupt()
-      thread3.interrupt()
-      serverSocket.close()
-    }
-
-    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
-
-    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
-
-    // record(id = 100000001) comes from batch segment_0
-    // record(id = 1) comes from stream segment_1
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
-
-    // not filter
-    spark.sql(s"select * " +
-              s"from ${ streamTableName } " +
-              s"where id < 10 limit 100").show(100, truncate = false)
-
-    if (useComplexDataType) {
-      // complex
-      spark.sql(s"select file.age, file.school " +
-                s"from ${ streamTableName } " +
-                s"where where file.age = 30 ").show(100, truncate = false)
-    }
-
-    spark.stop()
-    System.out.println("streaming finished")
-  }
-
-  def showTableCount(spark: SparkSession, tableName: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        for (_ <- 0 to 1000) {
-          spark.sql(s"select count(*) from $tableName").show(truncate = false)
-          Thread.sleep(1000 * 3)
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        var qry: StreamingQuery = null
-        try {
-          import spark.implicits._
-          val readSocketDF = spark.readStream
-            .format("socket")
-            .option("host", "localhost")
-            .option("port", 7071)
-            .load()
-            .as[String]
-            .map(_.split(","))
-            .map { fields => {
-              val tmp = fields(4).split("\\$")
-              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
-              if (fields(0).toInt % 2 == 0) {
-                StreamData(fields(0).toInt, null, fields(2), 
fields(3).toFloat, file)
-              } else {
-                StreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat, file)
-              }
-            } }
-
-          // Write data from socket stream to carbondata file
-          qry = readSocketDF.writeStream
-            .format("carbondata")
-            .trigger(ProcessingTime("5 seconds"))
-            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(tablePath))
-            .option("dbName", "default")
-            .option("tableName", "stream_table_with_row_parser")
-            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
-            .start()
-
-          qry.awaitTermination()
-        } catch {
-          case ex: Exception =>
-            ex.printStackTrace()
-            println("Done reading and writing streaming data")
-        } finally {
-          qry.stop()
-        }
-      }
-    }
-    thread.start()
-    thread
-  }
-
-  def writeSocket(serverSocket: ServerSocket): Thread = {
-    val thread = new Thread() {
-      override def run(): Unit = {
-        // wait for client to connection request and accept
-        val clientSocket = serverSocket.accept()
-        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
-        var index = 0
-        for (_ <- 1 to 1000) {
-          // write 5 records per iteration
-          for (_ <- 0 to 1000) {
-            index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 
10000.00).toString +
-                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
-          }
-          socketWriter.flush()
-          Thread.sleep(1000)
-        }
-        socketWriter.close()
-        System.out.println("Socket closed")
-      }
-    }
-    thread.start()
-    thread
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
new file mode 100644
index 0000000..d819a3f
--- /dev/null
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkStreamingExample.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.CarbonSparkStreamingFactory
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.CarbonSparkStreamingListener
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+/**
+ * This example introduces how to use Spark Streaming to write data
+ * to CarbonData stream table.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+// scalastyle:off println
+object SparkStreamingExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_stream_table"
+
+    val spark = ExampleUtils.createCarbonSession("SparkStreamingExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'streaming'='true',
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city')
+           | """.stripMargin)
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
+      // batch load
+      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
+      // add a Spark Streaming Listener to remove all lock for stream tables 
when stop app
+      ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
+
+    spark.sql(s"select * from ${ streamTableName } order by id 
desc").show(100, truncate = false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          println(System.currentTimeMillis())
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(30))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat))
+
+      println("init carbon table info")
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = spark.createDataFrame(rdd).toDF()
+        println(System.currentTimeMillis().toString() +
+          " at batch time: " + time.toString() +
+          " the count of received data: " + df.count())
+        CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, 
"default", tableName)
+          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+          .mode(SaveMode.Append)
+          .writeStreamData(df, time)
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 100) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 
10000.00).toString +
+                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(2000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
new file mode 100644
index 0000000..89883f8
--- /dev/null
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingUsingBatchLoadExample.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+/**
+ * This example introduces how to use CarbonData batch load to integrate
+ * with Spark Streaming(it's DStream, not Spark Structured Streaming)
+ */
+// scalastyle:off println
+
+case class DStreamData(id: Int, name: String, city: String, salary: Float)
+
+object StreamingUsingBatchLoadExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_batch_table"
+
+    val spark = 
ExampleUtils.createCarbonSession("StreamingUsingBatchLoadExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      // set AUTO_LOAD_MERGE to true to compact segment automatically
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city',
+           | 'AUTO_LOAD_MERGE'='true',
+           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
+           | """.stripMargin)
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
+      // batch load
+      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, checkpointPath)
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    // drop table
+    spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(15))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat))
+
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = spark.createDataFrame(rdd).toDF("id", "name", "city", 
"salary")
+        println("at time: " + time.toString() + " the count of received data: 
" + df.count())
+        df.write
+          .format("carbondata")
+          .option("tableName", tableName)
+          .mode(SaveMode.Append)
+          .save()
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 
10000.00).toString +
+                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
new file mode 100644
index 0000000..a07d504
--- /dev/null
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class FileElement(school: Array[String], age: Int)
+case class StreamData(id: Int, name: String, city: String, salary: Float, 
file: FileElement)
+
+// scalastyle:off println
+object StreamingWithRowParserExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = 
ExampleUtils.createCarbonSession("StreamingWithRowParserExample", 4)
+    val streamTableName = s"stream_table_with_row_parser"
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 
'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
+      // batch load
+      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, carbonTable.getTablePath)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tablePath: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+            .as[String]
+            .map(_.split(","))
+            .map { fields => {
+              val tmp = fields(4).split("\\$")
+              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+              if (fields(0).toInt % 2 == 0) {
+                StreamData(fields(0).toInt, null, fields(2), 
fields(3).toFloat, file)
+              } else {
+                StreamData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat, file)
+              }
+            } }
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(tablePath))
+            .option("dbName", "default")
+            .option("tableName", "stream_table_with_row_parser")
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 
10000.00).toString +
+                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d4f9003a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
new file mode 100644
index 0000000..1d4bedf
--- /dev/null
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+// scalastyle:off println
+object StructuredStreamingExample {
+  def main(args: Array[String]) {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 
4)
+    val streamTableName = s"stream_table"
+
+    val requireCreateTable = true
+    val useComplexDataType = false
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      if (useComplexDataType) {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT,
+             | file struct<school:array<string>, age:int>
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name', 
'dictionary_include'='city')
+             | """.stripMargin)
+      } else {
+        spark.sql(
+          s"""
+             | CREATE TABLE ${ streamTableName }(
+             | id INT,
+             | name STRING,
+             | city STRING,
+             | salary FLOAT
+             | )
+             | STORED BY 'carbondata'
+             | TBLPROPERTIES(
+             | 'streaming'='true', 'sort_columns'='name')
+             | """.stripMargin)
+      }
+
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
streamTableName)(spark)
+      // batch load
+      val path = 
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = startStreaming(spark, carbonTable)
+      val thread2 = writeSocket(serverSocket)
+      val thread3 = showTableCount(spark, streamTableName)
+
+      System.out.println("type enter to interrupt streaming")
+      System.in.read()
+      thread1.interrupt()
+      thread2.interrupt()
+      thread3.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate 
= false)
+
+    spark.sql(s"select * from ${ streamTableName }").show(100, truncate = 
false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate 
= false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    if (useComplexDataType) {
+      // complex
+      spark.sql(s"select file.age, file.school " +
+                s"from ${ streamTableName } " +
+                s"where where file.age = 30 ").show(100, truncate = false)
+    }
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          Thread.sleep(1000 * 3)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, carbonTable: CarbonTable): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", 7071)
+            .load()
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime("5 seconds"))
+            .option("checkpointLocation",
+              
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+            .option("dbName", "default")
+            .option("tableName", "stream_table")
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case ex: Exception =>
+            ex.printStackTrace()
+            println("Done reading and writing streaming data")
+        } finally {
+          qry.stop()
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 1000) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 
10000.00).toString +
+                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
+          }
+          socketWriter.flush()
+          Thread.sleep(1000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

Reply via email to