Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2182#discussion_r183084661
--- Diff: docs/streaming-guide.md ---
@@ -26,64 +26,64 @@ Start spark-shell in new terminal, type :paste, then
copy and run the following
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
- import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
- import org.apache.carbondata.core.util.path.CarbonStorePath
-
- val warehouse = new File("./warehouse").getCanonicalPath
- val metastore = new File("./metastore").getCanonicalPath
-
- val spark = SparkSession
- .builder()
- .master("local")
- .appName("StreamExample")
- .config("spark.sql.warehouse.dir", warehouse)
- .getOrCreateCarbonSession(warehouse, metastore)
-
- spark.sparkContext.setLogLevel("ERROR")
-
- // drop table if exists previously
- spark.sql(s"DROP TABLE IF EXISTS carbon_table")
- // Create target carbon table and populate with initial data
- spark.sql(
- s"""
- | CREATE TABLE carbon_table (
- | col1 INT,
- | col2 STRING
- | )
- | STORED BY 'carbondata'
- | TBLPROPERTIES('streaming'='true')""".stripMargin)
-
- val carbonTable = CarbonEnv.getCarbonTable(Some("default"),
"carbon_table")(spark)
- val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-
- // batch load
- var qry: StreamingQuery = null
- val readSocketDF = spark.readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 9099)
- .load()
-
- // Write data from socket stream to carbondata file
- qry = readSocketDF.writeStream
- .format("carbondata")
- .trigger(ProcessingTime("5 seconds"))
- .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
- .option("dbName", "default")
- .option("tableName", "carbon_table")
- .start()
-
- // start new thread to show data
- new Thread() {
- override def run(): Unit = {
- do {
- spark.sql("select * from carbon_table").show(false)
- Thread.sleep(10000)
- } while (true)
- }
- }.start()
-
- qry.awaitTermination()
+ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
--- End diff --
indentation is not correct
---