niuge01 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421921602



##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+  
+  A typical scenario is that the data is cleaned and preprocessed by Flink, 
and then written to Carbon, 
+  for subsequent analysis and queries. 
+
+  The CarbonData flink integration module is used connect Flink and Carbon in 
the above scenario.
+
+  The CarbonData flink integration module provides a set of Flink BulkWriter 
implementations 
+  (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink, 
and finally written into 
+  the stage directory of the target table by the CarbonXXXWriter. 
+
+  By default, those data in table stage directory, can not be immediately 
queried, those data can be queried 
+  after the "INSERT INTO $tableName STAGE" command is executed.
+
+  Since the flink data written to carbon is endless, in order to ensure the 
visibility of data 
+  and the controllable amount of data processed during the execution of each 
insert form stage command, 
+  the user should execute the insert from stage command in a timely manner.
+
+  The execution interval of the insert form stage command should take the data 
visibility requirements 
+  of the actual business and the flink data traffic. When the data visibility 
requirements are high 
+  or the data traffic is large, the execution interval should be appropriately 
shortened.
+
+##Usage description
+
+###Writing process
+
+  Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+  
+  Pseudo code and description: 
+  
+  ```scala
+    // Import dependencies.
+    import java.util.Properties
+    import org.apache.carbon.flink.CarbonWriterFactory
+    import org.apache.carbon.flink.ProxyFileSystem
+    import org.apache.carbondata.core.constants.CarbonCommonConstants
+    import org.apache.flink.api.common.restartstrategy.RestartStrategies
+    import org.apache.flink.core.fs.Path
+    import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+    import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+   
+    // Specify database name.
+    val databaseName = "default"
+   
+    // Specify target table name.
+    val tableName = "test"
+    // Table path of the target table.
+    val tablePath = "/data/warehouse/test"
+    // Specify local temporary path.
+    val dataTempPath = "/data/temp/"
+   
+    val tableProperties = new Properties
+    // Set the table properties here.
+   
+    val writerProperties = new Properties
+    // Set the writer properties here, such as temp path, commit threshold, 
access key, secret key, endpoint, etc.
+   
+    val carbonProperties = new Properties
+    // Set the carbon properties here, such as date format, store location, 
etc.
+     
+    // Create carbon bulk writer factory. Two writer types are supported: 
'Local' and 'S3'.
+    val writerFactory = CarbonWriterFactory.builder("Local").build(
+      databaseName,
+      tableName,
+      tablePath,
+      tableProperties,
+      writerProperties,
+      carbonProperties
+    )
+     
+    // Build a flink stream and run it.
+    // 1. New a flink execution environment.

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to