pratyakshsharma commented on a change in pull request #4235:
URL: https://github.com/apache/carbondata/pull/4235#discussion_r734958004



##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2681,4 +2681,220 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT = 
"false";
 
+  
//////////////////////////////////////////////////////////////////////////////////////////
+  // CDC streamer configs start here
+  
//////////////////////////////////////////////////////////////////////////////////////////
+  /**
+   * The database name where the target table is present to merge the incoming 
data. If not given by
+   * user, system will take the current database in the spark session.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_DATABASE_NAME = 
"carbon.streamer.target.database";
+
+  /**
+   * The target carbondata table where the data has to be merged. If this is 
not configured by user,
+   * the operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_TABLE_NAME = 
"carbon.streamer.target.table";
+
+  /**
+   * Source type to ingest data from. It can be kafka or DFS
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SOURCE_TYPE = 
"carbon.streamer.source.type";
+
+  public static final String CARBON_STREAMER_SOURCE_TYPE_DEFAULT = "kafka";
+
+  /**
+   * An absolute path on a given file system from where data needs to be read 
to ingest into the
+   * target carbondata table. Mandatory if the ingestion source type is DFS.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_DFS_INPUT_PATH = 
"carbon.streamer.dfs.input.path";
+
+  /**
+   * Schema registry url in case schema registry is selected as schema 
provider.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SCHEMA_REGISTRY_URL = 
"schema.registry.url";
+
+  // **************** kafka properties constants *********************
+  /**
+   * Kafka topics to consume data from. Mandatory if Kafka is selected as the 
ingestion source.
+   * If multiple topic are present, the value of the property can be comma 
separated topic names.
+   * If not present in case of kafka source, operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_KAFKA_INPUT_TOPIC =
+      "carbon.streamer.input.kafka.topic";
+
+  /**
+   * Kafka brokers to connect to in case Kafka is selected as an ingestion 
source. If not present in
+   * case of kafka source, operation will fail.
+   */
+  @CarbonProperty
+  public static final String KAFKA_BROKERS = "bootstrap.servers";
+
+  /**
+   * Kafka offset to fall back to in case no checkpoint is available for 
starting ingestion.
+   * Valid values - Latest and Earliest.
+   */
+  @CarbonProperty
+  public static final String KAFKA_INITIAL_OFFSET_TYPE = "auto.offset.reset";
+
+  public static final String CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT 
= "earliest";
+
+  /**
+   * Key deserializer for kafka. Mandatory for Kafka source.
+   */
+  @CarbonProperty
+  public static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
+
+  // TODO: check how to take this value, class name or one wrapper above the 
deserializer
+  public static final String KAFKA_KEY_DESERIALIZER_DEFAULT =
+      "org.apache.kafka.common.serialization.StringDeserializer";
+
+  /**
+   * Value deserializer for Kafka. Mandatory for Kafka source
+   */
+  @CarbonProperty
+  public static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer";
+
+  public static final String KAFKA_VALUE_DESERIALIZER_DEFAULT =
+      "io.confluent.kafka.serializers.KafkaAvroDeserializer";
+
+  public static final String AVRO_SCHEMA = 
"carbon.streamer.avro.schema.deserialize";
+
+  /**
+   * Auto commit to kafka. If enabled, kafka will blindly commit the offsets 
to offset topic whether
+   * the respective operation is failed or not. So default we will keep it 
false.
+   */
+  public static final String KAFKA_ENABLE_AUTO_COMMIT = "enable.auto.commit";
+
+  public static final String KAFKA_ENABLE_AUTO_COMMIT_DEFAULT = "false";
+
+  /**
+   * This property is required if the consumer uses either the group 
management functionality by
+   * using subscribe(topic) or the Kafka-based offset management strategy.
+   */
+  @CarbonProperty
+  public static final String KAFKA_GROUP_ID = "group.id";
+
+  // ***************************************************************
+
+  /**
+   * Format of the incoming data/payload.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT =
+      "carbon.streamer.input.payload.format";
+
+  public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT_DEFAULT = 
"avro";
+
+  /**
+   * Schema provider for the incoming batch of data. Currently, 2 types of 
schema providers are
+   * supported - FileBasedProvider and SchemaRegistryProvider
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SCHEMA_PROVIDER = 
"carbon.streamer.schema.provider";
+
+  public static final String CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT = 
"SchemaRegistry";
+
+  public static final String CARBON_STREAMER_FILE_SCHEMA_PROVIDER = 
"FileSchema";
+
+  /**
+   * Absolute Path to file containing the schema of incoming data. Mandatory 
if file-based schema
+   * provider is selected.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SOURCE_SCHEMA_PATH =
+      "carbon.streamer.source.schema.path";
+
+  /**
+   * Different merge operations are supported - INSERT, UPDATE, DELETE, UPSERT
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE =
+      "carbon.streamer.merge.operation.type";
+
+  public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE_DEFAULT = 
"upsert";
+
+  /**
+   * Name of the field in source schema reflecting the IUD operation types on 
source data rows.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_MERGE_OPERATION_FIELD =
+      "carbon.streamer.merge.operation.field";
+
+  /**
+   * Name of the field from source schema whose value can be used for picking 
the latest updates for
+   * a particular record in the incoming batch in case of duplicates record 
keys. Useful if the
+   * write operation type is UPDATE or UPSERT. This will be used only if
+   * carbon.streamer.upsert.deduplicate is enabled.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD =
+      "carbon.streamer.source.ordering.field";
+
+  public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT = 
"";
+
+  /**
+   * Join key/record key for a particular record. Will be used for 
deduplication of the incoming
+   * batch. If not present operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_KEY_FIELD = 
"carbon.streamer.merge.operation.field";
+
+  /**
+   * This property specifies if the incoming batch needs to be deduplicated in 
case of INSERT
+   * operation type. If set to true, the incoming batch will be deduplicated 
against the existing
+   * data in the target carbondata table.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_INSERT_DEDUPLICATE =
+      "carbon.streamer.insert.deduplicate";
+
+  public static final String CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT = 
"false";
+
+  /**
+   * This property specifies if the incoming batch needs to be deduplicated 
(when multiple updates
+   * for the same record key are present in the incoming batch) in case of 
UPSERT/UPDATE operation
+   * type. If set to true, the user needs to provide proper value for the 
source ordering field as
+   * well.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE =
+      "carbon.streamer.upsert.deduplicate";
+
+  public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT = 
"true";
+
+  /**
+   * Minimum batch interval time between 2 continuous ingestion in continuous 
mode. Should be
+   * specified in seconds.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_BATCH_INTERVAL = 
"carbon.streamer.batch.interval";
+
+  // TODO: check and then decide the default
+  public static final String CARBON_STREAMER_BATCH_INTERVAL_DEFAULT = "10";

Review comment:
       How are we planning to decide the default value for this field?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to