pratyakshsharma commented on a change in pull request #4243:
URL: https://github.com/apache/carbondata/pull/4243#discussion_r769867806



##########
File path: docs/scd-and-cdc-guide.md
##########
@@ -131,4 +131,88 @@ clauses can have at most one UPDATE and one DELETE action, 
These clauses have th
 
 * Please refer example class 
[MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala)
 to understand and implement scd and cdc scenarios using APIs.
 * Please refer example class 
[DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala)
 to understand and implement scd and cdc scenarios using sql. 
-* Please refer example class 
[DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala)
 to understand and implement cdc using UPSERT APIs.
\ No newline at end of file
+* Please refer example class 
[DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala)
 to understand and implement cdc using UPSERT APIs.
+
+### Streamer Tool
+
+Carbondata streamer tool is a very powerful tool for incrementally capturing 
change events from varied sources like kafka or DFS and merging them into 
target carbondata table. This essentially means one needs to integrate with 
external solutions like Debezium or Maxwell for moving the change events to 
kafka, if one wishes to capture changes from primary databases like mysql. The 
tool currently requires incoming data to be present in avro format and incoming 
schema to evolve in backwards compatible way.
+
+Below is a high level architecture of how the overall pipeline looks like -
+
+![Carbondata streamer tool 
pipeline](../docs/images/carbondata-streamer-tool-pipeline.png?raw=true)
+
+#### Configs
+
+Streamer tool exposes below configs for users to cater to their CDC use cases 
- 
+
+| Parameter                         | Default Value                            
                  | Description                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                               |
+|-----------------------------------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| carbon.streamer.target.database   | (none)                                   
                  | The database name where the target table is present to 
merge the incoming data. If not given by user, system will take the current 
database in the spark session.                                                  
                                                                                
                                                                                
                                                                                
                                        |
+| carbon.streamer.target.table      | (none)                                   
                  | The target carbondata table where the data has to be 
merged. If this is not configured by user, the operation will fail.             
                                                                                
                                                                                
                                                                                
                                                                                
                                      |
+| carbon.streamer.source.type       | kafka                                    
                  | Streamer tool currently supports two types of data sources. 
One can ingest data from either kafka or DFS into target carbondata table using 
streamer tool.                                                                  
                                                                                
                                                                                
                                                                                
                      |
+| carbon.streamer.dfs.input.path    | (none)                                   
                  | An absolute path on a given file system from where data 
needs to be read to ingest into the target carbondata table. Mandatory if the 
ingestion source type is DFS.                                                   
                                                                                
                                                                                
                                                                                
                                     |
+| schema.registry.url               | (none)                                   
                  | Streamer tool supports 2 different ways to supply schema of 
incoming data. Schemas can be supplied using avro files (file based schema 
provider) or using schema registry. This property defines the url to connect to 
in case schema registry is used as the schema source.                           
                                                                                
                                                                                
                                    |
+| carbon.streamer.input.kafka.topic | (none)                                   
                  | This is a mandatory property to be set in case kafka is 
chosen as the source of data. This property defines the topics from where 
streamer tool will consume the data.                                            
                                                                                
                                                                                
                                                                                
                                         |
+| bootstrap.servers                 | (none)                                   
                  | This is another mandatory property in case kafka is chosen 
as the source of data. This defines the end points for kafka brokers.           
                                                                                
                                                                                
                                                                                
                                                                                
                                |
+| auto.offset.reset | earliest                                                 
  | Streamer tool maintains checkpoints to keep a track of the incoming 
messages which are already consumed. In case of first ingestion using kafka 
source, this property defines the offset from where ingestion will start. This 
property can take only 2 valid values - `latest` and `earliest`                 
                                                                                
                                                                                
                            |
+| key.deserializer | 
`org.apache.kafka.common.serialization.StringDeserializer` | Any message in 
kafka is ultimately a key value pair in the form of serialized bytes. This 
property defines the deserializer to deserialize the key of a message.          
                                                                                
                                                                                
                                                                                
                                                                                
 |
+| value.deserializer | `io.confluent.kafka.serializers.KafkaAvroDeserializer`  
   | This property defines the class which will be used for deserializing the 
values present in kafka topic.                                                  
                                                                                
                                                                                
                                                                                
                                                                                
                  |
+| enable.auto.commit | false                                                   
   | Kafka maintains an internal topic for storing offsets corresponding to the 
consumer groups. This property determines if kafka should actually go forward 
and commit the offsets consumed in this internal topic. We recommend to keep it 
as false since we use spark streaming checkpointing to take care of the same.   
                                                                                
                                                                                
                  |
+| group.id | (none)                                                     | 
Streamer tool is ultimately a consumer for kafka. This property determines the 
consumer group id streamer tool belongs to.                                     
                                                                                
                                                                                
                                                                                
                                                                                
            |
+| carbon.streamer.input.payload.format | avro                                  
                     | This determines the format of the incoming messages from 
source. Currently only avro is supported. We have plans to extend this support 
to json as well in near future. Avro is the most preferred format for CDC use 
cases since it helps in making the message size very compact and has good 
support for schema evolution use cases as well.                                 
                                                                                
                                           |
+| carbon.streamer.schema.provider | SchemaRegistry                             
                | As discussed earlier, streamer tool supports 2 ways of 
supplying schema for incoming messages - schema registry and avro files. 
Confluent schema registry is the preferred way when using avro as the input 
format.                                                                         
                                                                                
                                                                                
                                               |
+| carbon.streamer.source.schema.path | (none)                                  
                   | This property defines the absolute path where files 
containing schemas for incoming messages are present.                           
                                                                                
                                                                                
                                                                                
                                                                                
                                       |
+| carbon.streamer.merge.operation.type | upsert                                
                     | This defines the operation that needs to be performed on 
the incoming batch of data while writing it to target data set.                 
                                                                                
                                                                                
                                                                                
                                                                                
                                  |
+| carbon.streamer.merge.operation.field | (none)                               
                      | This property defines the field in incoming schema 
which contains the type of operation performed at source. For example, Debezium 
includes a field called `op` when reading change events from primary database. 
Do not confuse this property with `carbon.streamer.merge.operation.type` which 
defines the operation to be performed on the incoming batch of data. However 
this property is needed so that streamer tool is able to identify rows deleted 
at source when the operation type is `upsert`. |
+| carbon.streamer.record.key.field | (none)                                    
                 | This defines the record key for a particular incoming 
record. This is used by the streamer tool for performing deduplication. In case 
this is not defined, operation will fail.                                       
                                                                                
                                                                                
                                                                                
                                     |
+| carbon.streamer.batch.interval | 10                                          
               | Minimum batch interval time between 2 continuous ingestion in 
continuous mode. Should be specified in seconds.                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
+| carbon.streamer.source.ordering.field | <none>                               
                      | Name of the field from source schema whose value can be 
used for picking the latest updates for a particular record in the incoming 
batch in case of multiple updates for the same record key. Useful if the write 
operation type is UPDATE or UPSERT. This will be used only if 
`carbon.streamer.upsert.deduplicate` is enabled.                                
                                                                                
                                                          |
+| carbon.streamer.insert.deduplicate | false                                   
                   | This property specifies if the incoming batch needs to be 
deduplicated in case of INSERT operation type. If set to true, the incoming 
batch will be deduplicated against the existing data in the target carbondata 
table.                                                                          
                                                                                
                                                                                
                                       |
+| carbon.streamer.upsert.deduplicate | true                                    
                   | This property specifies if the incoming batch needs to be 
deduplicated (when multiple updates for the same record key are present in the 
incoming batch) in case of UPSERT/UPDATE operation type. If set to true, the 
user needs to provide proper value for the source ordering field as well.       
                                                                                
                                                                                
                                     |
+| carbon.streamer.meta.columns | (none)                                        
             | Generally when performing CDC operations on primary databases, 
few metadata columns are added along with the actual columns for book keeping 
purposes. This property enables users to list down all such metadata fields 
(comma separated) which should not be merged with the target carboondata table. 
                                                                                
                                                                                
                                  |
+| carbon.enable.schema.enforcement | true                                      
                 | This flag decides if table schema needs to change as per the 
incoming batch schema. If set to true, incoming schema will be validated with 
existing table schema. If the schema has evolved, the incoming batch cannot be 
ingested and job will simply fail.                                              
                                                                                
                                                                                
                                 |
+
+#### Commands
+
+1. For kafka source - 
+
+```
+bin/spark-submit --class org.apache.carbondata.streamer.CarbonDataStreamer \
+--master spark://root1-ThinkPad-T490s:7077 \
+jars/apache-carbondata-2.3.0-SNAPSHOT-bin-spark2.4.5-hadoop2.7.2.jar \

Review comment:
       my bad, missed it completely. Thank you for pointing this out. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to