Indhumathi27 commented on a change in pull request #3752:
URL: https://github.com/apache/carbondata/pull/3752#discussion_r421233356
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
Review comment:
Please add license header
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
Review comment:
```suggestion
## Usage scenarios
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
Review comment:
```suggestion
## Usage description
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
Review comment:
```suggestion
### Writing process
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
Review comment:
```suggestion
### Writer properties
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
Review comment:
```suggestion
#### Local Writer
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
Review comment:
```suggestion
#### S3 Writer
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+ | Property | Name |
Description
|
+
|-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key |
Access key of s3 file system
|
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key |
Secret key of s3 file system
|
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint |
Endpoint of s3 file system
|
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and mv to target
data path finally. |
+ | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold |
While written data count reach the threshold, data writer will flush and move
data to target data path. |
+
+###Insert from stage
Review comment:
```suggestion
### Insert from stage
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
Review comment:
```suggestion
// 1. Create a new flink execution environment.
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+ | Property | Name |
Description
|
+
|-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key |
Access key of s3 file system
|
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key |
Secret key of s3 file system
|
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint |
Endpoint of s3 file system
|
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and mv to target
data path finally. |
+ | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold |
While written data count reach the threshold, data writer will flush and move
data to target data path. |
+
+###Insert from stage
+
+ [Grammar
Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
+
+##Usage Example Code
Review comment:
```suggestion
## Usage Example Code
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
Review comment:
```suggestion
The CarbonData flink integration module is used to connect Flink and
Carbon in the above scenario.
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
Review comment:
```suggestion
after the `INSERT INTO $tableName STAGE` command is executed.
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
Review comment:
```suggestion
| CarbonLocalProperty.DATA_TEMP_PATH |
carbon.writer.local.data.temp.path | Usually is a local path, data will write
to temp path first, and move to target data path finally. |
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+ | Property | Name |
Description
|
+
|-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key |
Access key of s3 file system
|
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key |
Secret key of s3 file system
|
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint |
Endpoint of s3 file system
|
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and mv to target
data path finally. |
+ | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold |
While written data count reach the threshold, data writer will flush and move
data to target data path. |
+
+###Insert from stage
+
+ [Grammar
Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
Review comment:
```suggestion
Refer [Grammar
Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
for syntax.
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+ | Property | Name |
Description
|
+
|-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key |
Access key of s3 file system
|
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key |
Secret key of s3 file system
|
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint |
Endpoint of s3 file system
|
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and mv to target
data path finally. |
Review comment:
```suggestion
| CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and move to target
data path finally. |
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
Review comment:
```suggestion
Typical flink stream: `Source -> Process -> Output(Carbon Writer Sink)`
```
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
+
+ A typical scenario is that the data is cleaned and preprocessed by Flink,
and then written to Carbon,
+ for subsequent analysis and queries.
+
+ The CarbonData flink integration module is used connect Flink and Carbon in
the above scenario.
+
+ The CarbonData flink integration module provides a set of Flink BulkWriter
implementations
+ (CarbonLocalWriter and CarbonS3Writer). The data is processed by the Flink,
and finally written into
+ the stage directory of the target table by the CarbonXXXWriter.
+
+ By default, those data in table stage directory, can not be immediately
queried, those data can be queried
+ after the "INSERT INTO $tableName STAGE" command is executed.
+
+ Since the flink data written to carbon is endless, in order to ensure the
visibility of data
+ and the controllable amount of data processed during the execution of each
insert form stage command,
+ the user should execute the insert from stage command in a timely manner.
+
+ The execution interval of the insert form stage command should take the data
visibility requirements
+ of the actual business and the flink data traffic. When the data visibility
requirements are high
+ or the data traffic is large, the execution interval should be appropriately
shortened.
+
+##Usage description
+
+###Writing process
+
+ Typical flink stream: Source -> Process -> Output(Carbon Writer Sink)
+
+ Pseudo code and description:
+
+ ```scala
+ // Import dependencies.
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+
+ // Specify database name.
+ val databaseName = "default"
+
+ // Specify target table name.
+ val tableName = "test"
+ // Table path of the target table.
+ val tablePath = "/data/warehouse/test"
+ // Specify local temporary path.
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+ // Set the table properties here.
+
+ val writerProperties = new Properties
+ // Set the writer properties here, such as temp path, commit threshold,
access key, secret key, endpoint, etc.
+
+ val carbonProperties = new Properties
+ // Set the carbon properties here, such as date format, store location,
etc.
+
+ // Create carbon bulk writer factory. Two writer types are supported:
'Local' and 'S3'.
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ // Build a flink stream and run it.
+ // 1. New a flink execution environment.
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ // Set flink environment configuration here, such as parallelism,
checkpointing, restart strategy, etc.
+
+ // 2. Create flink data source, may be a kafka source, custom source, or
others.
+ // The data type of source should be Array[AnyRef].
+ // Array length should equals to table column count, and values order in
array should matches table column order.
+ val source = ...
+ // 3. Create flink stream and set source.
+ val stream = environment.addSource(source)
+ // 4. Add other flink operators here.
+ // ...
+ // 5. Set flink stream target (write data to carbon with a write sink).
+ stream.addSink(StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build)
+ // 6. Run flink stream.
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // Handle execute exception here.
+ }
+ ```
+
+###Writer properties
+
+####Local Writer
+
+ | Property | Name
| Description
|
+
|--------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonLocalProperty.DATA_TEMP_PATH | carbon.writer.local.data.temp.path
| Usually is a local path, data will write to temp path first, and mv to
target data path finally. |
+ | CarbonLocalProperty.COMMIT_THRESHOLD |
carbon.writer.local.commit.threshold | While written data count reach the
threshold, data writer will flush and move data to target data path. |
+
+####S3 Writer
+
+ | Property | Name |
Description
|
+
|-----------------------------------|-----------------------------------|---------------------------------------------------------------------------------------------------------|
+ | CarbonS3Property.ACCESS_KEY | carbon.writer.s3.access.key |
Access key of s3 file system
|
+ | CarbonS3Property.SECRET_KEY | carbon.writer.s3.secret.key |
Secret key of s3 file system
|
+ | CarbonS3Property.ENDPOINT | carbon.writer.s3.endpoint |
Endpoint of s3 file system
|
+ | CarbonS3Property.DATA_TEMP_PATH | carbon.writer.s3.data.temp.path |
Usually is a local path, data will write to temp path first, and mv to target
data path finally. |
+ | CarbonS3Property.COMMIT_THRESHOLD | carbon.writer.s3.commit.threshold |
While written data count reach the threshold, data writer will flush and move
data to target data path. |
+
+###Insert from stage
+
+ [Grammar
Description](./dml-of-carbondata.md#insert-data-into-carbondata-table-from-stage-input-files)
+
+##Usage Example Code
+
+ Writing flink data to local carbon table.
+
+ ```scala
+ import java.util.Properties
+ import org.apache.carbon.flink.CarbonLocalProperty
+ import org.apache.carbon.flink.CarbonWriterFactory
+ import org.apache.carbon.flink.ProxyFileSystem
+ import org.apache.carbondata.core.constants.CarbonCommonConstants
+ import org.apache.flink.api.common.restartstrategy.RestartStrategies
+ import org.apache.flink.core.fs.Path
+ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+ import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+ import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+ val databaseName = "default"
+ val tableName = "test"
+ val tablePath = "/data/warehouse/test"
+ val dataTempPath = "/data/temp/"
+
+ val tableProperties = new Properties
+
+ val writerProperties = new Properties
+ writerProperties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH,
dataTempPath)
+
+ val carbonProperties = new Properties
+
carbonProperties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+
carbonProperties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
"1024")
+
+ val writerFactory = CarbonWriterFactory.builder("Local").build(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(1)
+ environment.enableCheckpointing(2000L)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ // Define a custom source.
+ val source = new SourceFunction[Array[AnyRef]]() {
+ override
+ def run(sourceContext: SourceFunction.SourceContext[Array[AnyRef]]):
Unit = {
+ // Array length should equals to table column count, and values order
in array should matches table column order.
+ val data = new Array[AnyRef](3)
+ data(0) = "value1"
+ data(1) = "value2"
+ data(2) = "value3"
+ sourceContext.collect(data)
+ }
+
+ override
+ def cancel(): Unit = {
+ // do something.
+ }
+ }
+
+ val stream = environment.addSource(source)
+ val streamSink = StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), writerFactory).build
+
+ stream.addSink(streamSink)
+
+ try {
+ environment.execute
+ } catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
Review comment:
Add Insert stage command in the example
##########
File path: docs/flink-integration-guide.md
##########
@@ -0,0 +1,193 @@
+##Usage scenarios
Review comment:
Can add title `# Carbon Flink Integration guide`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]