[CARBONDATA-1616] Add CarbonData Streaming Ingestion Guide

Add CarbonData Streaming Ingestion Guide

This closes #1880


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cdff1932
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cdff1932
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cdff1932

Branch: refs/heads/branch-1.3
Commit: cdff193255418e56ab4a98c441eb6b809142c9a2
Parents: c8a3eb5
Author: QiangCai <[email protected]>
Authored: Thu Jan 4 11:52:07 2018 +0800
Committer: chenliang613 <[email protected]>
Committed: Thu Feb 1 10:59:36 2018 +0800

----------------------------------------------------------------------
 README.md               |   1 +
 docs/streaming-guide.md | 169 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdff1932/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 3b6792e..952392b 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,7 @@ CarbonData is built using Apache Maven, to [build 
CarbonData](https://github.com
 * [Data Management on 
CarbonData](https://github.com/apache/carbondata/blob/master/docs/data-management-on-carbondata.md)
 * [Cluster Installation and 
Deployment](https://github.com/apache/carbondata/blob/master/docs/installation-guide.md)
 * [Configuring 
Carbondata](https://github.com/apache/carbondata/blob/master/docs/configuration-parameters.md)
+* [Streaming 
Ingestion](https://github.com/apache/carbondata/blob/master/docs/streaming-guide.md)
 * [FAQ](https://github.com/apache/carbondata/blob/master/docs/faq.md)
 * [Trouble 
Shooting](https://github.com/apache/carbondata/blob/master/docs/troubleshooting.md)
 * [Useful 
Tips](https://github.com/apache/carbondata/blob/master/docs/useful-tips-on-carbondata.md)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdff1932/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
new file mode 100644
index 0000000..201f8e0
--- /dev/null
+++ b/docs/streaming-guide.md
@@ -0,0 +1,169 @@
+# CarbonData Streaming Ingestion
+
+## Quick example
+Download and unzip spark-2.2.0-bin-hadoop2.7.tgz, and export $SPARK_HOME
+
+Package carbon jar, and copy 
assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 
to $SPARK_HOME/jars
+```shell
+mvn clean package -DskipTests -Pspark-2.2
+```
+
+Start a socket data server in a terminal
+```shell
+ nc -lk 9099
+```
+ type some CSV rows as following
+```csv
+1,col1
+2,col2
+3,col3
+4,col4
+5,col5
+```
+
+Start spark-shell in new terminal, type :paste, then copy and run the 
following code.
+```scala
+ import java.io.File
+ import org.apache.spark.sql.{CarbonEnv, SparkSession}
+ import org.apache.spark.sql.CarbonSession._
+ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+ import org.apache.carbondata.core.util.path.CarbonStorePath
+ 
+ val warehouse = new File("./warehouse").getCanonicalPath
+ val metastore = new File("./metastore").getCanonicalPath
+ 
+ val spark = SparkSession
+   .builder()
+   .master("local")
+   .appName("StreamExample")
+   .config("spark.sql.warehouse.dir", warehouse)
+   .getOrCreateCarbonSession(warehouse, metastore)
+
+ spark.sparkContext.setLogLevel("ERROR")
+
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS carbon_table")
+ // Create target carbon table and populate with initial data
+ spark.sql(
+   s"""
+      | CREATE TABLE carbon_table (
+      | col1 INT,
+      | col2 STRING
+      | )
+      | STORED BY 'carbondata'
+      | TBLPROPERTIES('streaming'='true')""".stripMargin)
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
"carbon_table")(spark)
+ val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ 
+ // batch load
+ var qry: StreamingQuery = null
+ val readSocketDF = spark.readStream
+   .format("socket")
+   .option("host", "localhost")
+   .option("port", 9099)
+   .load()
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+   .format("carbondata")
+   .trigger(ProcessingTime("5 seconds"))
+   .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
+   .option("dbName", "default")
+   .option("tableName", "carbon_table")
+   .start()
+
+ // start new thread to show data
+ new Thread() {
+   override def run(): Unit = {
+     do {
+       spark.sql("select * from carbon_table").show(false)
+       Thread.sleep(10000)
+     } while (true)
+   }
+ }.start()
+
+ qry.awaitTermination()
+```
+
+Continue to type some rows into data server, and spark-shell will show the new 
data of the table.
+
+## Create table with streaming property
+Streaming table is just a normal carbon table with "streaming" table property, 
user can create
+streaming table using following DDL.
+```sql
+ CREATE TABLE streaming_table (
+  col1 INT,
+  col2 STRING
+ )
+ STORED BY 'carbondata'
+ TBLPROPERTIES('streaming'='true')
+```
+
+ property name | default | description
+ ---|---|--- 
+ streaming | false |Whether to enable streaming ingest feature for this table 
<br /> Value range: true, false 
+ 
+ "DESC FORMATTED" command will show streaming property.
+ ```sql
+ DESC FORMATTED streaming_table
+ ```
+ 
+## Alter streaming property
+For an old table, use ALTER TABLE command to set the streaming property.
+```sql
+ALTER TABLE streaming_table SET TBLPROPERTIES('streaming'='true')
+```
+
+## Acquire streaming lock
+At the begin of streaming ingestion, the system will try to acquire the table 
level lock of streaming.lock file. If the system isn't able to acquire the lock 
of this table, it will throw an InterruptedException.
+
+## Create streaming segment
+The input data of streaming will be ingested into a segment of the CarbonData 
table, the status of this segment is streaming. CarbonData call it a streaming 
segment. The "tablestatus" file will record the segment status and data size. 
The user can use “SHOW SEGMENTS FOR TABLE tableName” to check segment 
status. 
+
+After the streaming segment reaches the max size, CarbonData will change the 
segment status to "streaming finish" from "streaming", and create new 
"streaming" segment to continue to ingest streaming data.
+
+option | default | description
+--- | --- | ---
+carbon.streaming.segment.max.size | 1024000000 | Unit: byte <br />max size of 
streaming segment
+
+segment status | description
+--- | ---
+streaming | The segment is running streaming ingestion
+streaming finish | The segment already finished streaming ingestion, <br /> it 
will be handed off to a segment in the columnar format
+
+## Change segment status
+Use below command to change the status of "streaming" segment to "streaming 
finish" segment.
+```sql
+ALTER TABLE streaming_table FINISH STREAMING
+```
+
+## Handoff "streaming finish" segment to columnar segment
+Use below command to handoff "streaming finish" segment to columnar format 
segment manually.
+```sql
+ALTER TABLE streaming_table COMPACT 'streaming'
+
+```
+
+## Auto handoff streaming segment
+Config the property "carbon.streaming.auto.handoff.enabled" to auto handoff 
streaming segment. If the value of this property is true, after the streaming 
segment reaches the max size, CarbonData will change this segment to "streaming 
finish" status and trigger to auto handoff this segment to columnar format 
segment in a new thread.
+
+property name | default | description
+--- | --- | ---
+carbon.streaming.auto.handoff.enabled | true | whether to auto trigger handoff 
operation
+
+## Close streaming table
+Use below command to handoff all streaming segments to columnar format 
segments and modify the streaming property to false, this table becomes a 
normal table.
+```sql
+ALTER TABLE streaming_table COMPACT 'close_streaming'
+
+```
+
+## Constraint
+1. reject set streaming property from true to false.
+2. reject UPDATE/DELETE command on the streaming table.
+3. reject create pre-aggregation DataMap on the streaming table.
+4. reject add the streaming property on the table with pre-aggregation DataMap.
+5. if the table has dictionary columns, it will not support concurrent data 
loading.
+6. block delete "streaming" segment while the streaming ingestion is running.
+7. block drop the streaming table while the streaming ingestion is running.

Reply via email to