Github user chenliang613 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1840#discussion_r162841010
--- Diff:
examples/spark2/src/main/scala/org/apache/carbondata/examples/DStreamWithBatchTableExample.scala
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{CarbonEnv, SaveMode, SparkSession}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath,
CarbonTablePath}
+
+/**
+ * This example introduces how to use CarbonData batch load to integrate
+ * with Spark Streaming(it's DStream, not Spark Structured Streaming)
+ */
+// scalastyle:off println
+
+case class DStreamData(id: Int, name: String, city: String, salary: Float)
+
+object DStreamWithBatchTableExample {
+
+ def main(args: Array[String]): Unit = {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val storeLocation = s"$rootPath/examples/spark2/target/store"
+ val warehouse = s"$rootPath/examples/spark2/target/warehouse"
+ val metastoredb = s"$rootPath/examples/spark2/target"
+ val checkpointPath =
+ s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+ System.currentTimeMillis().toString()
+ val streamTableName = s"dstream_batch_table"
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"yyyy/MM/dd")
+
+ import org.apache.spark.sql.CarbonSession._
+ val spark = SparkSession
+ .builder()
+ .master("local[4]")
+ .appName("DStreamWithBatchTableExample")
+ .config("spark.sql.warehouse.dir", warehouse)
+ .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+ spark.sparkContext.setLogLevel("WARN")
+
+ val requireCreateTable = true
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ // set AUTO_LOAD_MERGE to true to compact segment automatically
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'sort_columns'='name',
+ | 'dictionary_include'='city',
+ | 'MAJOR_COMPACTION_SIZE'='64',
+ | 'AUTO_LOAD_MERGE'='true',
+ | 'COMPACTION_LEVEL_THRESHOLD'='4,10')
+ | """.stripMargin)
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"),
streamTableName)(spark)
+ val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ // batch load
+ val path =
s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = writeSocket(serverSocket)
+ val thread2 = showTableCount(spark, streamTableName)
+ val ssc = startStreaming(spark, streamTableName, tablePath,
checkpointPath)
+ // wait for stop signal to stop Spark Streaming App
+ waitForStopSignal(ssc)
+ // it need to start Spark Streaming App in main thread
+ // otherwise it will encounter an not-serializable exception.
+ ssc.start()
+ ssc.awaitTermination()
+ thread1.interrupt()
+ thread2.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100,
truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName }").show(100, truncate =
false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100,
truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+ spark.stop()
--- End diff --
add drop table.
---