pratyakshsharma commented on a change in pull request #4235: URL: https://github.com/apache/carbondata/pull/4235#discussion_r737215696
########## File path: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ########## @@ -2681,4 +2681,220 @@ private CarbonCommonConstants() { public static final String CARBON_CDC_MINMAX_PRUNING_ENABLED_DEFAULT = "false"; + ////////////////////////////////////////////////////////////////////////////////////////// + // CDC streamer configs start here + ////////////////////////////////////////////////////////////////////////////////////////// + /** + * The database name where the target table is present to merge the incoming data. If not given by + * user, system will take the current database in the spark session. + */ + @CarbonProperty + public static final String CARBON_STREAMER_DATABASE_NAME = "carbon.streamer.target.database"; + + /** + * The target carbondata table where the data has to be merged. If this is not configured by user, + * the operation will fail. + */ + @CarbonProperty + public static final String CARBON_STREAMER_TABLE_NAME = "carbon.streamer.target.table"; + + /** + * Source type to ingest data from. It can be kafka or DFS + */ + @CarbonProperty + public static final String CARBON_STREAMER_SOURCE_TYPE = "carbon.streamer.source.type"; + + public static final String CARBON_STREAMER_SOURCE_TYPE_DEFAULT = "kafka"; + + /** + * An absolute path on a given file system from where data needs to be read to ingest into the + * target carbondata table. Mandatory if the ingestion source type is DFS. + */ + @CarbonProperty + public static final String CARBON_STREAMER_DFS_INPUT_PATH = "carbon.streamer.dfs.input.path"; + + /** + * Schema registry url in case schema registry is selected as schema provider. + */ + @CarbonProperty + public static final String CARBON_STREAMER_SCHEMA_REGISTRY_URL = "schema.registry.url"; + + // **************** kafka properties constants ********************* + /** + * Kafka topics to consume data from. Mandatory if Kafka is selected as the ingestion source. + * If multiple topic are present, the value of the property can be comma separated topic names. + * If not present in case of kafka source, operation will fail. + */ + @CarbonProperty + public static final String CARBON_STREAMER_KAFKA_INPUT_TOPIC = + "carbon.streamer.input.kafka.topic"; + + /** + * Kafka brokers to connect to in case Kafka is selected as an ingestion source. If not present in + * case of kafka source, operation will fail. + */ + @CarbonProperty + public static final String KAFKA_BROKERS = "bootstrap.servers"; + + /** + * Kafka offset to fall back to in case no checkpoint is available for starting ingestion. + * Valid values - Latest and Earliest. + */ + @CarbonProperty + public static final String KAFKA_INITIAL_OFFSET_TYPE = "auto.offset.reset"; + + public static final String CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT = "earliest"; + + /** + * Key deserializer for kafka. Mandatory for Kafka source. + */ + @CarbonProperty + public static final String KAFKA_KEY_DESERIALIZER = "key.deserializer"; + + // TODO: check how to take this value, class name or one wrapper above the deserializer + public static final String KAFKA_KEY_DESERIALIZER_DEFAULT = + "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * Value deserializer for Kafka. Mandatory for Kafka source + */ + @CarbonProperty + public static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer"; + + public static final String KAFKA_VALUE_DESERIALIZER_DEFAULT = + "io.confluent.kafka.serializers.KafkaAvroDeserializer"; + + public static final String AVRO_SCHEMA = "carbon.streamer.avro.schema.deserialize"; + + /** + * Auto commit to kafka. If enabled, kafka will blindly commit the offsets to offset topic whether + * the respective operation is failed or not. So default we will keep it false. + */ + public static final String KAFKA_ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + public static final String KAFKA_ENABLE_AUTO_COMMIT_DEFAULT = "false"; + + /** + * This property is required if the consumer uses either the group management functionality by + * using subscribe(topic) or the Kafka-based offset management strategy. Review comment: The existing comment is not very clear I feel and as a best practice, it is always recommended to assign a proper unique value to group id when consuming from kafka. So we can remove the existing comment and update with above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org