This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 18840af  [CARBONDATA-4305] Support Carbondata Streamer tool for 
incremental fetch and merge from kafka and DFS Sources
18840af is described below

commit 18840af9c1f7154b58e3c397dfc5a4440674bcee
Author: akashrn5 <[email protected]>
AuthorDate: Wed Sep 1 19:24:24 2021 +0530

    [CARBONDATA-4305] Support Carbondata Streamer tool for incremental fetch 
and merge from kafka and DFS Sources
    
    Why is this PR needed?
    In the current Carbondata CDC solution, if any user wants to integrate it 
with a streaming source then he
    need to write a separate spark application to capture changes which is an 
overhead. We should be able to
    incrementally capture the data changes from primary databases and should be 
able to incrementally ingest
    the same in the data lake so that the overall latency decreases. The former 
is taken care of using
    log-based CDC systems like Maxwell and Debezium. Here is a solution for the 
second aspect using Apache Carbondata.
    
    What changes were proposed in this PR?
    Carbondata streamer tool is a spark streaming application which enables 
users to incrementally ingest data
    from various sources, like Kafka(standard pipeline would be like MYSQL => 
debezium => (kafka + Schema registry) => Carbondata Streamer tool)
    and DFS into their data lakes. The tool comes with out-of-the-box support 
for almost all types of schema
    evolution use cases. With the streamer tool only add column support is 
given with drop column and
    other schema changes capability in line in the upcoming days. Please refer 
to design document for
    more details about usage and working of the tool.
    
    This closes #4235
---
 .../core/constants/CarbonCommonConstants.java      | 162 +++++++++++-
 .../carbondata/core/util/CarbonProperties.java     |   4 +
 integration/spark/pom.xml                          |  47 ++++
 .../apache/carbondata/streamer/AvroDFSSource.scala |  58 +++++
 .../carbondata/streamer/AvroKafkaSource.scala      |  64 +++++
 .../apache/carbondata/streamer/CarbonDStream.scala |  56 ++++
 .../carbondata/streamer/CarbonDataStreamer.scala   | 135 ++++++++++
 .../streamer/CarbonDataStreamerException.scala     |  33 +++
 .../carbondata/streamer/CarbonStreamerConfig.scala | 282 +++++++++++++++++++++
 .../apache/carbondata/streamer/SchemaSource.scala  | 106 ++++++++
 .../org/apache/carbondata/streamer/Source.scala    | 235 +++++++++++++++++
 .../apache/carbondata/streamer/SourceFactory.scala |  54 ++++
 .../mutation/merge/CarbonMergeDataSetCommand.scala |   5 +
 .../spark/testsuite/merge/MergeTestCase.scala      |  68 +++++
 pom.xml                                            |   4 +
 15 files changed, 1312 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f24108a..919fc90 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2684,6 +2684,150 @@ public final class CarbonCommonConstants {
   
//////////////////////////////////////////////////////////////////////////////////////////
   // CDC streamer configs start here
   
//////////////////////////////////////////////////////////////////////////////////////////
+  /**
+   * The database name where the target table is present to merge the incoming 
data. If not given by
+   * user, system will take the current database in the spark session.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_DATABASE_NAME = 
"carbon.streamer.target.database";
+
+  /**
+   * The target carbondata table where the data has to be merged. If this is 
not configured by user,
+   * the operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_TABLE_NAME = 
"carbon.streamer.target.table";
+
+  /**
+   * Source type to ingest data from. It can be kafka or DFS
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SOURCE_TYPE = 
"carbon.streamer.source.type";
+
+  public static final String CARBON_STREAMER_SOURCE_TYPE_DEFAULT = "kafka";
+
+  /**
+   * An absolute path on a given file system from where data needs to be read 
to ingest into the
+   * target carbondata table. Mandatory if the ingestion source type is DFS.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_DFS_INPUT_PATH = 
"carbon.streamer.dfs.input.path";
+
+  /**
+   * Schema registry url in case schema registry is selected as schema 
provider.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SCHEMA_REGISTRY_URL = 
"schema.registry.url";
+
+  // **************** kafka properties constants *********************
+  /**
+   * Kafka topics to consume data from. Mandatory if Kafka is selected as the 
ingestion source.
+   * If multiple topic are present, the value of the property can be comma 
separated topic names.
+   * If not present in case of kafka source, operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_KAFKA_INPUT_TOPIC =
+      "carbon.streamer.input.kafka.topic";
+
+  /**
+   * Kafka brokers to connect to in case Kafka is selected as an ingestion 
source. If not present in
+   * case of kafka source, operation will fail.
+   */
+  @CarbonProperty
+  public static final String KAFKA_BROKERS = "bootstrap.servers";
+
+  /**
+   * Kafka offset to fall back to in case no checkpoint is available for 
starting ingestion.
+   * Valid values - Latest and Earliest.
+   */
+  @CarbonProperty
+  public static final String KAFKA_INITIAL_OFFSET_TYPE = "auto.offset.reset";
+
+  public static final String CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT 
= "earliest";
+
+  /**
+   * Key deserializer for kafka. Mandatory for Kafka source.
+   */
+  @CarbonProperty
+  public static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
+
+  // TODO: check how to take this value, class name or one wrapper above the 
deserializer
+  public static final String KAFKA_KEY_DESERIALIZER_DEFAULT =
+      "org.apache.kafka.common.serialization.StringDeserializer";
+
+  /**
+   * Value deserializer for Kafka. Mandatory for Kafka source
+   */
+  @CarbonProperty
+  public static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer";
+
+  public static final String KAFKA_VALUE_DESERIALIZER_DEFAULT =
+      "io.confluent.kafka.serializers.KafkaAvroDeserializer";
+
+  public static final String AVRO_SCHEMA = 
"carbon.streamer.avro.schema.deserialize";
+
+  /**
+   * Auto commit to kafka. If enabled, kafka will blindly commit the offsets 
to offset topic whether
+   * the respective operation is failed or not. So default we will keep it 
false. Since Spark
+   * Streaming checkpoint is enabled, it will take care committing the 
consumed offsets and it will
+   * be taken care for failure scenarios also.
+   */
+  public static final String KAFKA_ENABLE_AUTO_COMMIT = "enable.auto.commit";
+
+  public static final String KAFKA_ENABLE_AUTO_COMMIT_DEFAULT = "false";
+
+  /**
+   * The carbondata streamer tool is a consumer for kafka ingestion. So this 
property will assign
+   * group id for streamer tool in case of kafka ingestion.
+   */
+  @CarbonProperty
+  public static final String KAFKA_GROUP_ID = "group.id";
+
+  // ***************************************************************
+
+  /**
+   * Format of the incoming data/payload.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT =
+      "carbon.streamer.input.payload.format";
+
+  public static final String CARBON_STREAMER_INPUT_PAYLOAD_FORMAT_DEFAULT = 
"avro";
+
+  /**
+   * Schema provider for the incoming batch of data. Currently, 2 types of 
schema providers are
+   * supported - FileBasedProvider and SchemaRegistryProvider
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SCHEMA_PROVIDER = 
"carbon.streamer.schema.provider";
+
+  public static final String CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT = 
"SchemaRegistry";
+
+  public static final String CARBON_STREAMER_FILE_SCHEMA_PROVIDER = 
"FileSchema";
+
+  /**
+   * Path to file/folder containing the schema of incoming data. Mandatory if 
file-based schema
+   * provider is selected.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_SOURCE_SCHEMA_PATH =
+      "carbon.streamer.source.schema.path";
+
+  /**
+   * Different merge operations are supported - INSERT, UPDATE, DELETE, UPSERT
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE =
+      "carbon.streamer.merge.operation.type";
+
+  public static final String CARBON_STREAMER_MERGE_OPERATION_TYPE_DEFAULT = 
"upsert";
+
+  /**
+   * Name of the field in source schema reflecting the IUD operation types on 
source data rows.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_MERGE_OPERATION_FIELD =
+      "carbon.streamer.merge.operation.field";
 
   /**
    * Name of the field from source schema whose value can be used for picking 
the latest updates for
@@ -2697,6 +2841,13 @@ public final class CarbonCommonConstants {
   public static final String CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT = 
"";
 
   /**
+   * Join key/record key for a particular record. Will be used for 
deduplication of the incoming
+   * batch. If not present operation will fail.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_KEY_FIELD = 
"carbon.streamer.record.key.field";
+
+  /**
    * This property specifies if the incoming batch needs to be deduplicated in 
case of INSERT
    * operation type. If set to true, the incoming batch will be deduplicated 
against the existing
    * data in the target carbondata table.
@@ -2718,8 +2869,17 @@ public final class CarbonCommonConstants {
   public static final String CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT = 
"true";
 
   /**
+   * Minimum batch interval time between 2 continuous ingestion in continuous 
mode. Should be
+   * specified in seconds.
+   */
+  @CarbonProperty
+  public static final String CARBON_STREAMER_BATCH_INTERVAL = 
"carbon.streamer.batch.interval";
+
+  public static final String CARBON_STREAMER_BATCH_INTERVAL_DEFAULT = "10";
+
+  /**
    * The metadata columns coming from the source stream data, which should not 
be included in the
-   * target data.
+   * target data. The value should be comma separated column names.
    */
   @CarbonProperty public static final String CARBON_STREAMER_META_COLUMNS =
       "carbon.streamer.meta.columns";
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 975d41a..8d8105a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -119,6 +119,10 @@ public final class CarbonProperties {
     return INSTANCE;
   }
 
+  public Properties getAllPropertiesInstance() {
+    return carbonProperties;
+  }
+
   /**
    * This method is to validate only a specific key added to carbonProperties 
using addProperty
    *
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 08ba960..4c2b812 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -108,6 +108,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+      <version>1.72</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-cli</artifactId>
       <version>${project.version}</version>
@@ -176,6 +181,48 @@
     <!-- spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.twitter</groupId>
+          <artifactId>chill-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>5.3.4</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>common-config</artifactId>
+      <version>5.3.4</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>common-utils</artifactId>
+      <version>5.3.4</version>
+    </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-schema-registry-client</artifactId>
+      <version>5.3.4</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
       <exclusions>
         <exclusion>
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroDFSSource.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroDFSSource.scala
new file mode 100644
index 0000000..b9bf99d
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroDFSSource.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.mapred.AvroKey
+import org.apache.avro.mapreduce.AvroKeyInputFormat
+import org.apache.hadoop.io.NullWritable
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * This class handles of preparing the Dstream and merging the data onto 
target carbondata table
+ * for the DFS Source containing avro data.
+ * @param carbonTable target carbondata table.
+ */
+class AvroDFSSource(carbonTable: CarbonTable) extends Source with Serializable 
{
+
+  override
+  def getStream(
+      ssc: StreamingContext,
+      sparkSession: SparkSession): CarbonDStream = {
+    val dfsFilePath = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_DFS_INPUT_PATH)
+    // here set the reader schema in the hadoop conf so that the 
AvroKeyInputFormat will read
+    // using the reader schema and populate the default values for the columns 
where data is not
+    // present. This will help to apply the schema changes to target 
carbondata table.
+    val value = ssc.fileStream[AvroKey[Any], NullWritable, 
AvroKeyInputFormat[Any]](FileFactory
+      .getUpdatedFilePath(dfsFilePath))
+      .map(rec => rec._1.datum().asInstanceOf[GenericRecord])
+    CarbonDStream(value.asInstanceOf[DStream[Any]])
+  }
+
+  override
+  def prepareDFAndMerge(inputStream: CarbonDStream): Unit = {
+    prepareDSForAvroSourceAndMerge(inputStream, carbonTable)
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroKafkaSource.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroKafkaSource.scala
new file mode 100644
index 0000000..e630b2c
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroKafkaSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.KafkaUtils
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * This class handles of preparing the Dstream and merging the data onto 
target carbondata table
+ * for the kafka Source containing avro data.
+ * @param carbonTable target carbondata table.
+ */
+class AvroKafkaSource(carbonTable: CarbonTable) extends Source with 
Serializable {
+
+  override
+  def getStream(
+      ssc: StreamingContext,
+      sparkSession: SparkSession): CarbonDStream = {
+    // separate out the non carbon properties and prepare the kafka param
+    val kafkaParams = CarbonProperties.getInstance()
+      .getAllPropertiesInstance
+      .asScala
+      .filter { prop => !prop._1.startsWith("carbon") }
+    kafkaParams.put(CarbonCommonConstants.AVRO_SCHEMA, schema.toString())
+    val topics = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_KAFKA_INPUT_TOPIC)
+      .split(CarbonCommonConstants.COMMA)
+    val value = KafkaUtils
+      .createDirectStream(ssc, PreferConsistent, Subscribe[String, 
String](topics, kafkaParams))
+      .map(obj => obj.value().asInstanceOf[GenericRecord])
+    CarbonDStream(value.asInstanceOf[DStream[Any]])
+  }
+
+  override
+  def prepareDFAndMerge(inputStream: CarbonDStream): Unit = {
+    prepareDSForAvroSourceAndMerge(inputStream, carbonTable)
+  }
+
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDStream.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDStream.scala
new file mode 100644
index 0000000..b281811
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDStream.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.execution.command.mutation.merge.MergeOperationType
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Wrapper class to hold the spark's DStream object as Dstream can be of 
different types based on
+ * the different input sources like text, avro, kafka etc.
+ * @param inputDStream Spark's DStream object
+ */
+case class CarbonDStream(inputDStream: DStream[Any]) extends Serializable {
+
+  /**
+   * Performs the merge operation onto target carbondata table based on the 
operation type.
+   * @param targetDsOri target dataset of carbondata table.
+   * @param srcDS source dataset prepared from different sources like kafka, 
avro, json etc.
+   * @param keyColumn the join column based on which merge is performed.
+   * @param mergeOperationType Merge operation type to perform, can be UPSERT, 
UPDATE, INSERT and
+   *                           DELETE.
+   */
+  def performMergeOperation(
+      targetDsOri: Dataset[Row],
+      srcDS: Dataset[Row],
+      keyColumn: String,
+      mergeOperationType: String): Unit = {
+    MergeOperationType.withName(mergeOperationType.toUpperCase) match {
+      case MergeOperationType.UPSERT =>
+        targetDsOri.upsert(srcDS, keyColumn).execute()
+      case MergeOperationType.UPDATE =>
+        targetDsOri.update(srcDS, keyColumn).execute()
+      case MergeOperationType.DELETE =>
+        targetDsOri.delete(srcDS, keyColumn).execute()
+      case MergeOperationType.INSERT =>
+        targetDsOri.insert(srcDS, keyColumn).execute()
+    }
+  }
+
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamer.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamer.scala
new file mode 100644
index 0000000..5ae1502
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamer.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import com.beust.jcommander.JCommander
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Carbondata streamer, which is a spark streaming application to pull data 
from different
+ * sources and merge onto target cabondata table.
+ */
+object CarbonDataStreamer {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def createConfig(streamerConfig: CarbonStreamerConfig,
+      args: Array[String]): Unit = {
+    JCommander.newBuilder().addObject(streamerConfig).build().parse(args: _*)
+  }
+
+  /**
+   * This method creates streaming context for the first time if no checkpoint 
directory present for
+   * the table.
+   *
+   * @param sparkSession          Spark Session.
+   * @param targetCarbonDataTable target carbondata table to merge.
+   * @return Spark StreamingContext
+   */
+  def createStreamingContext(sparkSession: SparkSession,
+      targetCarbonDataTable: CarbonTable): StreamingContext = {
+    val batchDuration = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL,
+        CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL_DEFAULT).toLong
+    val ssc = new StreamingContext(sparkSession.sparkContext, 
Seconds(batchDuration))
+
+    // get the source Dstream based on source type
+    val sourceType = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE,
+        CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE_DEFAULT)
+    val sourceCarbonDStream = SourceFactory.apply(sourceType,
+      ssc,
+      sparkSession,
+      targetCarbonDataTable)
+    // Perform merge on source stream
+    SourceFactory.source.prepareDFAndMerge(sourceCarbonDStream)
+    // set the checkpoint directory for spark streaming
+    
ssc.checkpoint(CarbonTablePath.getStreamingCheckpointDir(targetCarbonDataTable.getTablePath))
+    ssc
+  }
+
+  def main(args: Array[String]): Unit = {
+    // parse the incoming arguments and prepare the configurations
+    val streamerConfigs = new CarbonStreamerConfig()
+    createConfig(streamerConfigs, args)
+    streamerConfigs.setConfigsToCarbonProperty(streamerConfigs)
+
+    val spark = SparkSession
+      .builder()
+      .master(streamerConfigs.sparkMaster)
+      .appName("CarbonData Streamer tool")
+      .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
+      .config("spark.streaming.driver.writeAheadLog.allowBatching", "true")
+      .config("spark.streaming.driver.writeAheadLog.batchingTimeout", 15000)
+      .enableHiveSupport()
+      .getOrCreate()
+    CarbonEnv.getInstance(spark)
+
+    SparkSession.setActiveSession(spark)
+    SparkSession.setDefaultSession(spark)
+
+    val targetTableName = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_TABLE_NAME)
+
+    var databaseName = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_DATABASE_NAME)
+    databaseName = if (databaseName.equalsIgnoreCase("")) {
+      spark.sessionState.catalog.getCurrentDatabase
+    } else {
+      databaseName
+    }
+
+    // if the target table is non-carbondata table, throw exception
+    if (!CarbonPlanHelper.isCarbonTable(TableIdentifier(targetTableName, 
Some(databaseName)))) {
+      throw new UnsupportedOperationException("The merge operation using 
CarbonData Streamer tool" +
+                                              " for non carbondata table is 
not supported.")
+    }
+
+    val targetCarbonDataTable = CarbonEnv.getCarbonTable(Some(databaseName), 
targetTableName)(spark)
+    val dbAndTb = targetCarbonDataTable.getQualifiedName
+    val segmentProperties = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
+    if (!(segmentProperties.equals("") || segmentProperties.trim.equals("*"))) 
{
+      throw new CarbonDataStreamerException(
+        s"carbon.input.segments.$dbAndTb should not be set for table during 
merge operation. " +
+        s"Please reset the property to carbon.input.segments.dbAndTb=*")
+    }
+
+    val ssc = 
StreamingContext.getOrCreate(CarbonTablePath.getStreamingCheckpointDir(
+      targetCarbonDataTable.getTablePath),
+      () => createStreamingContext(spark, targetCarbonDataTable))
+
+    try {
+      ssc.start()
+      ssc.awaitTermination()
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("streaming failed. Stopping the streaming application 
gracefully.", ex)
+        ssc.stop(stopSparkContext = true, stopGracefully = true)
+    }
+
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamerException.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamerException.scala
new file mode 100644
index 0000000..680b4c3
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamerException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streamer
+
+/**
+ * Exception class for any streamer tool failures.
+ */
+class CarbonDataStreamerException(message: String, exception: Throwable)
+  extends Exception(message, exception) {
+
+  def this(exception: Throwable) {
+    this("", exception)
+  }
+
+  def this(msg: String) {
+    this(msg, null)
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonStreamerConfig.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonStreamerConfig.scala
new file mode 100644
index 0000000..3bd4826
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonStreamerConfig.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import com.beust.jcommander.Parameter
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * The config class to parse the program arguments, validate and prepare the 
required configuration.
+ */
+class CarbonStreamerConfig() extends Serializable {
+
+  @Parameter(names = Array("--master"), description = "Spark master")
+  var sparkMaster: String = "local[2]"
+
+  @Parameter(names = Array("--target-table"),
+    description = "The target carbondata table where the data has to be 
merged. If this is not " +
+                  "configured by user, the operation will fail.",
+    required = true)
+  var targetTableName: String = ""
+
+  @Parameter(names = Array("--database-name"),
+    description = "The database name where the target table is present to 
merge the incoming data" +
+                  ". If not given by user, system will take the current 
database in the spark " +
+                  "session.",
+    required = false)
+  var databaseName: String = ""
+
+  @Parameter(names = Array("--source-type"),
+    description = "Source type to ingest data from. It can be kafka or DFS",
+    required = false)
+  var sourceType: String = 
CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE_DEFAULT
+
+  @Parameter(names = Array("--dfs-source-input-path"),
+    description = "An absolute path on a given file system from where data 
needs to be read to " +
+                  "ingest into the target carbondata table. Mandatory if the 
ingestion source " +
+                  "type is DFS.",
+    required = false)
+  var dfsSourceInputPth: String = ""
+
+  // ----------- kafka properties ----------------------
+  @Parameter(names = Array("--input-kafka-topic"),
+    description = "Kafka topics to consume data from. Mandatory if Kafka is 
selected as the " +
+                  "ingestion source. If multiple topic are present, the varue 
of the property can" +
+                  " be comma separated topic names. If not present in case of 
kafka source, " +
+                  "operation will fail.",
+    required = false)
+  var inputKafkaTopic: String = ""
+
+  @Parameter(names = Array("--brokers"),
+    description = "Kafka brokers to connect to in case Kafka is selected as an 
ingestion source. " +
+                  "If not present in case of kafka source, operation will 
fail.",
+    required = false)
+  var kafkaBrokerList: String = ""
+
+  @Parameter(names = Array("--kafka-initial-offset-type"),
+    description = "Kafka offset to fall back to in case no checkpoint is 
available for starting " +
+                  "ingestion. Valid values - Latest and Earliest.",
+    required = false)
+  var kafkaInitialOffsetType: String = CarbonCommonConstants
+    .CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT
+
+  @Parameter(names = Array("--key-deserializer"),
+    description = "Key deserializer for kafka. Mandatory for Kafka source.",
+    required = false)
+  var keyDeserializer: String = 
CarbonCommonConstants.KAFKA_KEY_DESERIALIZER_DEFAULT
+
+  @Parameter(names = Array("--value-deserializer"),
+    description = "value deserializer for kafka. Mandatory for Kafka source.",
+    required = false)
+  var valueDeserializer: String = 
CarbonCommonConstants.KAFKA_VALUE_DESERIALIZER_DEFAULT
+
+  @Parameter(names = Array("--schema-registry-url"),
+    description = "Schema registry url, in case schema registry is selected as 
schema provider.",
+    required = false)
+  var schemaRegistryURL: String = ""
+
+  @Parameter(names = Array("--group-id"),
+    description = "This property is required if the consumer uses either the 
group management " +
+                  "functionality by using subscribe(topic) or the Kafka-based 
offset management " +
+                  "strategy.",
+    required = false)
+  var groupId: String = ""
+
+  // -------------------------------------------------------------------- //
+
+  @Parameter(names = Array("--input-payload-format"),
+    description = "Format of the incoming data/payload.",
+    required = false)
+  var inputPayloadFormat: String = CarbonCommonConstants
+    .CARBON_STREAMER_INPUT_PAYLOAD_FORMAT_DEFAULT
+
+  @Parameter(names = Array("--schema-provider-type"),
+    description = "Schema provider for the incoming batch of data. Currently, 
2 types of schema " +
+                  "providers are supported - FileBasedProvider and 
SchemaRegistryProvider",
+    required = false)
+  var schemaProviderType: String = 
CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT
+
+  @Parameter(names = Array("--source-schema-file-path"),
+    description = "Absolute Path to file containing the schema of incoming 
data. Mandatory if " +
+                  "file-based schema provider is selected.",
+    required = false)
+  var sourceSchemaFilePath: String = ""
+
+  @Parameter(names = Array("--merge-operation-type"),
+    description = "Different merge operations are supported - INSERT, UPDATE, 
DELETE, UPSERT",
+    required = false)
+  var mergeOperationType: String = CarbonCommonConstants
+    .CARBON_STREAMER_MERGE_OPERATION_TYPE_DEFAULT
+
+  @Parameter(names = Array("--delete-operation-field"),
+    description = "Name of the field in source schema reflecting the IUD 
operation types on " +
+                  "source data rows.",
+    required = false)
+  var deleteOperationField: String = ""
+
+  @Parameter(names = Array("--delete-field-value"),
+    description = "Name of the field in source schema reflecting the IUD 
operation types on " +
+                  "source data rows.",
+    required = false)
+  var deleteFieldValue: String = ""
+
+  @Parameter(names = Array("--source-ordering-field"),
+    description = "Name of the field from source schema whose value can be 
used for picking the " +
+                  "latest updates for a particular record in the incoming 
batch in case of " +
+                  "duplicates record keys. Useful if the write operation type 
is UPDATE or UPSERT" +
+                  ". This will be used only if 
carbon.streamer.upsert.deduplicate is enabled.",
+    required = true)
+  var sourceOrderingField: String = CarbonCommonConstants
+    .CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT
+
+  @Parameter(names = Array("--record-key-field"),
+    description = "Join key/record key for a particular record. Will be used 
for deduplication of" +
+                  " the incoming batch. If not present operation will fail.",
+    required = true)
+  var keyColumn: String = ""
+
+  @Parameter(names = Array("--deduplicate"),
+    description = "This property specifies if the incoming batch needs to be 
deduplicated in case" +
+                  " of INSERT operation type. If set to true, the incoming 
batch will be " +
+                  "deduplicated against the existing data in the target 
carbondata table.",
+    required = false)
+  var deduplicateEnabled: String = 
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT
+
+  @Parameter(names = Array("--combine-before-upsert"),
+    description = "This property specifies if the incoming batch needs to be 
deduplicated (when " +
+                  "multiple updates for the same record key are present in the 
incoming batch) in" +
+                  " case of UPSERT/UPDATE operation type. If set to true, the 
user needs to " +
+                  "provide proper value for the source ordering field as 
well.",
+    required = false)
+  var isCombineBeforeUpsert: String = CarbonCommonConstants
+    .CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT
+
+  @Parameter(names = Array("--min-batch-interval"),
+    description = "Minimum batch interval time between 2 continuous ingestion 
in continuous mode." +
+                  " Should be specified in seconds.",
+    required = false)
+  var batchInterval: String = 
CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL_DEFAULT
+
+  @Parameter(names = Array("--meta-columns"),
+    description = "Metadata columns added in source dataset. Please mention 
all the metadata" +
+                  " columns as comma separated values which should not be 
written to the " +
+                  "final carbondata table",
+    required = false)
+  var metaColumnsAdded: String = ""
+
+  /**
+   * This method set the configuration to carbonproperties which are passed as 
a arguments while
+   * starting the streamer application
+   */
+  def setConfigsToCarbonProperty(streamerConfig: CarbonStreamerConfig): Unit = 
{
+    val carbonPropertiesInstance = CarbonProperties.getInstance()
+
+    if (streamerConfig.targetTableName.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Target carbondata table is not configured. Please configure and 
retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_TABLE_NAME,
+      streamerConfig.targetTableName)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_DATABASE_NAME,
+      streamerConfig.databaseName)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE,
+      streamerConfig.sourceType)
+    if (sourceType.equalsIgnoreCase(SourceFactory.DFS.toString) &&
+        dfsSourceInputPth.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "The DFS source path to read and ingest data onto target carbondata 
table is must in case" +
+        " of DFS source type.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_DFS_INPUT_PATH,
+      streamerConfig.dfsSourceInputPth)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER,
+      streamerConfig.schemaProviderType)
+    if (schemaProviderType.equalsIgnoreCase(CarbonCommonConstants
+      .CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT) &&
+        streamerConfig.schemaRegistryURL.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Schema registry URL is must when the schema provider is set as 
SchemaRegistry. Please " +
+        "configure and retry.")
+    } else if (schemaProviderType.equalsIgnoreCase(CarbonCommonConstants
+      .CARBON_STREAMER_FILE_SCHEMA_PROVIDER) &&
+               streamerConfig.sourceSchemaFilePath.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Schema file path is must when the schema provider is set as 
FileSchema. Please " +
+        "configure and retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_REGISTRY_URL,
+      streamerConfig.schemaRegistryURL)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_SCHEMA_PATH,
+      streamerConfig.sourceSchemaFilePath)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_INPUT_PAYLOAD_FORMAT,
+      streamerConfig.inputPayloadFormat)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_MERGE_OPERATION_TYPE,
+      streamerConfig.mergeOperationType)
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants
+      .CARBON_STREAMER_MERGE_OPERATION_FIELD, 
streamerConfig.deleteOperationField)
+    if ((deleteOperationField.isEmpty && deleteFieldValue.nonEmpty) ||
+        (deleteOperationField.nonEmpty && deleteFieldValue.isEmpty)) {
+      throw new CarbonDataStreamerException(
+        "Either both the values of --delete-operation-field and 
--delete-field-value should not " +
+        "be configured or both must be configured. Please configure and 
retry.")
+    }
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants
+      .CARBON_STREAMER_SOURCE_ORDERING_FIELD, 
streamerConfig.sourceOrderingField)
+    if (streamerConfig.keyColumn.isEmpty) {
+      throw new CarbonDataStreamerException(
+        "The key column is must for the merge operation. Please configure and 
retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_KEY_FIELD,
+      streamerConfig.keyColumn)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+      streamerConfig.deduplicateEnabled)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+      streamerConfig.isCombineBeforeUpsert)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_INITIAL_OFFSET_TYPE,
+      streamerConfig.kafkaInitialOffsetType)
+    if (sourceType.equalsIgnoreCase(SourceFactory.KAFKA.toString) &&
+        streamerConfig.inputKafkaTopic.isEmpty) {
+      throw new CarbonDataStreamerException(
+        "Kafka topics is must to consume and ingest data onto target 
carbondata table, in case" +
+        " of KAFKA source type.")
+    }
+    if (sourceType.equalsIgnoreCase(SourceFactory.KAFKA.toString) &&
+        streamerConfig.kafkaBrokerList.isEmpty) {
+      throw new CarbonDataStreamerException(
+        "Kafka broker list is must to consume and ingest data onto target 
carbondata table," +
+        "in case of KAFKA source type.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_KAFKA_INPUT_TOPIC,
+      streamerConfig.inputKafkaTopic)
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_BROKERS,
+      streamerConfig.kafkaBrokerList)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_KEY_DESERIALIZER,
+      streamerConfig.keyDeserializer)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_VALUE_DESERIALIZER,
+      streamerConfig.valueDeserializer)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_ENABLE_AUTO_COMMIT,
+      CarbonCommonConstants.KAFKA_ENABLE_AUTO_COMMIT_DEFAULT)
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants.KAFKA_GROUP_ID,
+      streamerConfig.groupId)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL,
+      streamerConfig.batchInterval)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS,
+      streamerConfig.metaColumnsAdded)
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/SchemaSource.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/SchemaSource.scala
new file mode 100644
index 0000000..bb83384
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/SchemaSource.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import java.io.FileInputStream
+import java.net.URL
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.avro.Schema
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+/**
+ * The Schema Source class to read the schema files.
+ */
+abstract class SchemaSource {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def getSchema: Schema
+
+}
+
+/**
+ * Reads schema from the Schema Registry when the schema provider type is 
SchemaRegistry.
+ */
+case class SchemaRegistry() extends SchemaSource {
+  override
+  def getSchema: Schema = {
+    var schemaRegistryURL = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_REGISTRY_URL)
+    val topicName = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_KAFKA_INPUT_TOPIC)
+    val topics = topicName.split(CarbonCommonConstants.COMMA).map(_.trim)
+    val topicToConsider = if (topics.length > 0) {
+      topics(0)
+    } else {
+      topicName
+    }
+    schemaRegistryURL = 
s"$schemaRegistryURL/subjects/$topicToConsider-value/versions/latest"
+    val registry = new URL(schemaRegistryURL)
+    val connection = registry.openConnection
+    val mapper = new ObjectMapper
+    val node = mapper.readTree(connection.getInputStream)
+    if (!node.elements().hasNext) {
+      throw new CarbonDataStreamerException(
+        "The Schema registry URL is not valid, please check and retry.")
+    }
+    new Schema.Parser().parse(node.get("schema").asText)
+  }
+}
+
+/**
+ * Reads schema from the directory or filepath provider by user when the 
schema provider type is
+ * FileSchema.
+ */
+case class FileSchema() extends SchemaSource {
+  override
+  def getSchema: Schema = {
+    val schemaPath = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_SCHEMA_PATH)
+    LOGGER.info(s"Reading the schema file from the path: $schemaPath")
+    val updatedSchemaFilePath = if 
(FileFactory.getCarbonFile(schemaPath).isDirectory) {
+      val files = FileFactory.getCarbonFile(schemaPath)
+        .listFiles(new CarbonFileFilter {
+          override def accept(file: CarbonFile): Boolean = {
+            !file.isDirectory && file.getName.endsWith(".avsc")
+          }
+        })
+      (files max Ordering[Long].on { file: CarbonFile => 
file.getLastModifiedTime }).getAbsolutePath
+    } else {
+      schemaPath
+    }
+    var inputStream: FileInputStream = null
+    val jsonSchema = try {
+      inputStream = new FileInputStream(updatedSchemaFilePath)
+      val mapper = new ObjectMapper
+      mapper.readTree(inputStream)
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("Read schema failed in File based Schema provider, ", ex)
+        throw ex
+    } finally {
+      CarbonUtil.closeStream(inputStream)
+    }
+    new Schema.Parser().parse(jsonSchema.asInstanceOf[JsonNode].toString)
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/Source.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/Source.scala
new file mode 100644
index 0000000..6103f57
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/Source.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import 
org.apache.spark.sql.execution.command.mutation.merge.CarbonMergeDataSetUtil
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.util.SparkStreamingUtil
+
+/**
+ * Abstract class Source which will be extended based on the source types of 
KAFKA, DFS etc
+ */
+abstract class Source {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  // avro schema, which is basically the read schema for the incoming data 
from sources like
+  // Kafka, DFS etc
+  protected var schema: Schema = _
+
+  protected lazy val schemaSource: SchemaSource = try {
+    schemaProviderClass match {
+      case "SchemaRegistry" | "org.apache.carbondata.streamer.SchemaRegistry" 
=> SchemaRegistry
+        .getClass
+        .getClassLoader
+        .loadClass("org.apache.carbondata.streamer.SchemaRegistry")
+        .newInstance()
+        .asInstanceOf[SchemaRegistry]
+      case "FileSchema" | "org.apache.carbondata.streamer.FileSchema" => 
FileSchema
+        .getClass
+        .getClassLoader
+        .loadClass("org.apache.carbondata.streamer.FileSchema")
+        .newInstance()
+        .asInstanceOf[FileSchema]
+      case _ => throw new UnsupportedOperationException(
+        "Schema provider other than SchemaRegistry and FileSchema are not 
supported. Please " +
+        "configure the proper value.")
+    }
+  } catch {
+    case ex: ClassNotFoundException =>
+      LOGGER.error("Schema provider class is configured wrongly. Please 
configure and retry.", ex)
+      throw new CarbonDataStreamerException(
+        "Schema provider class is configured wrongly. Please configure and 
retry.",
+        ex)
+  }
+
+  // join key column
+  protected val keyColumn: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_STREAMER_KEY_FIELD)
+
+  protected val mergeOperationType: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_STREAMER_MERGE_OPERATION_TYPE)
+
+  val schemaProviderClass: String = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER,
+      CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT)
+
+  /**
+   * This method will load the class based on the schema source provider 
configured by user and
+   * initializes the read schema.
+   */
+  def loadSchemaBasedOnConfiguredClass(): Unit = {
+    schema = schemaSource.getSchema
+  }
+
+  /**
+   * This method returns the Row object for each incoming GenericRecord.
+   * @param record incoming generic record read from kafka or DFS.
+   * @param sqlType the reader schema to convert to Row.
+   * @return Spark Row
+   */
+  def genericRecordToRow(record: GenericRecord, sqlType: StructType): Row = {
+    val values: scala.collection.mutable.Buffer[Object] = 
scala.collection.mutable.Buffer.empty
+    record.getSchema.getFields.asScala.foreach { field =>
+      var value = record.get(field.name())
+      // if the field type is union, assuming the first type will be null type.
+      val fieldType = if (field.schema().getType.equals(Type.UNION)) {
+        val fieldTypesInUnion = field.schema().getTypes
+        if (fieldTypesInUnion.get(0).getType.equals(Type.NULL)) {
+          fieldTypesInUnion.get(1).getType
+        } else {
+          fieldTypesInUnion.get(0).getType
+        }
+      } else {
+        field.schema().getType
+      }
+      fieldType match {
+        case Type.STRING if value != null =>
+          // Avro returns Utf8s for strings, which Spark SQL doesn't know how 
to use.
+          value = value.toString
+        case Type.BYTES =>
+          // Avro returns binary as a ByteBuffer, but Spark SQL wants a byte[].
+          value = value.asInstanceOf[ByteBuffer].array()
+        case _ =>
+      }
+      values += value
+    }
+    new GenericRowWithSchema(values.toArray, sqlType)
+  }
+
+  /**
+   * This method prepares the dataset for the avro source and calls to perform 
the specified
+   * merge operation.
+   * @param inputStream The wrapper object which contains the spark's DStream 
to read the data.
+   * @param carbonTable target carbondata table object.
+   */
+  def prepareDSForAvroSourceAndMerge(
+      inputStream: CarbonDStream,
+      carbonTable: CarbonTable): Unit = {
+    inputStream.inputDStream.asInstanceOf[DStream[GenericRecord]].foreachRDD { 
rdd =>
+      // get updated schema for each rdd to capture any schema changes
+      val schema = schemaSource.getSchema
+      val schemaString = schema.toString()
+      val sparkDataTypes = 
SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
+      // Get the singleton instance of SparkSession
+      val spark = 
SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
+      val rowRDD = rdd.mapPartitions { iterator =>
+        // avro schema can have the default values in the schema, but the 
spark Avro Deserializer do
+        // not take care to prepare the internal row based on this. So here 
get the fields where the
+        // default value is mentioned for the fields and also get the 
corresponding index. Based on
+        // this index in the internal row object update with the default value 
if the value at that
+        // specific index in the internal row is null.
+        val stringToAvroSchema = new Schema.Parser().parse(schemaString)
+        val filteredSchemaFields = stringToAvroSchema.getFields
+          .asScala
+          .zipWithIndex
+          .filter { case (field, _) =>
+            field.defaultVal() != null
+          }.map { case (field, i) =>
+          (field.name(), field.defaultVal(), i)
+        }
+        val encoder = RowEncoder.apply(sparkDataTypes).resolveAndBind()
+        new Iterator[Row] {
+          override def hasNext: Boolean = {
+            iterator.hasNext
+          }
+
+          override def next(): Row = {
+            if (!hasNext) {
+              throw new NoSuchElementException("next on empty iterator")
+            }
+            val record = iterator.next()
+            val avroWriteSchema = record.getSchema
+            var sparkAvroDeserializer = new AvroDeserializer(avroWriteSchema, 
sparkDataTypes)
+            val internalRow = 
sparkAvroDeserializer.deserialize(record).asInstanceOf[InternalRow]
+            // update with the default values if the value is null
+            if (avroWriteSchema.getFields.size() != 
sparkDataTypes.fields.length) {
+              val missingFieldsWithReadSchema = sparkDataTypes.fields
+                .map(_.name.toLowerCase())
+                
.diff(avroWriteSchema.getFields.asScala.map(_.name.toLowerCase()))
+              filteredSchemaFields.foreach { case (fieldName, defaultValue, i) 
=>
+                if 
(missingFieldsWithReadSchema.contains(fieldName.toLowerCase()) &&
+                    internalRow.isNullAt(i)) {
+                  internalRow.update(i, defaultValue)
+                }
+              }
+            }
+            val row = 
SparkStreamingUtil.convertInternalRowToRow(encoder)(internalRow)
+            sparkAvroDeserializer = null
+            row
+          }
+        }
+      }
+      // TODO: check without alias and remove alias
+      val targetDs = spark
+        .sql(s"select * from 
${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
+        .as("A")
+      val sourceDS = spark.createDataFrame(rowRDD, sparkDataTypes).as("B")
+      CarbonMergeDataSetUtil.handleSchemaEvolutionForCarbonStreamer(targetDs, 
sourceDS, spark)
+      val updatedTargetDs = spark
+        .sql(s"select * from 
${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
+        .as("A")
+      val updatedCarbonTable = 
CarbonEnv.getCarbonTable(Some(carbonTable.getDatabaseName),
+        carbonTable.getTableName)(spark)
+      val tableCols =
+        updatedCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).
+          
filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
+      inputStream.performMergeOperation(updatedTargetDs,
+        sourceDS.select(tableCols.map(col): _*).as("B"),
+        keyColumn,
+        mergeOperationType)
+    }
+  }
+
+  /**
+   * This method prepares the wrapper object containing the DStream. The 
DStream object prepared
+   * based on the input source type of Kafka or DFS.
+   * @param ssc Spark streaming context to prepare the DStream.
+   * @param sparkSession Spark Session.
+   * @return Wrapper object of CarbonDStream containing DStream.
+   */
+  def getStream(
+      ssc: StreamingContext,
+      sparkSession: SparkSession): CarbonDStream
+
+  /**
+   * This prepared the Dataset with the stream provided and call to perform 
the specified merge
+   * operation.
+   * @param inputStream Input CarbonDStream.
+   */
+  def prepareDFAndMerge(inputStream: CarbonDStream)
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/streamer/SourceFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/SourceFactory.scala
new file mode 100644
index 0000000..3cb472e
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/streamer/SourceFactory.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * Factory class to decide the Source class based on the Source Type.
+ */
+object SourceFactory extends Enumeration {
+
+  type source = Value
+
+  val KAFKA : SourceFactory.Value = Value("KAFKA")
+  val DFS : SourceFactory.Value = Value("DFS")
+
+  var source: Source = _
+
+  def apply(
+      sourceType: String,
+      ssc: StreamingContext,
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable): CarbonDStream = {
+    SourceFactory.withName(sourceType.toUpperCase) match {
+      case KAFKA =>
+        source = new AvroKafkaSource(carbonTable)
+        source.loadSchemaBasedOnConfiguredClass()
+        source.getStream(ssc, sparkSession)
+      case DFS =>
+        source = new AvroDFSSource(carbonTable)
+        source.loadSchemaBasedOnConfiguredClass()
+        source.getStream(ssc, sparkSession)
+      case other => throw new CarbonDataStreamerException(s"The source type 
$other is not yet " +
+                                                          s"supported")
+    }
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index c16e901..53f5c26 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -85,6 +85,11 @@ case class CarbonMergeDataSetCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val relations = 
CarbonSparkUtil.collectCarbonRelation(targetDsOri.logicalPlan)
     val st = System.currentTimeMillis()
+    // if the input data is empty, return to avoid unnecessary operations. It 
can happen in
+    // streaming cases where new data is not pushed to streams.
+    if (srcDS.rdd.isEmpty()) {
+      return Seq()
+    }
     val targetDsAliasName = targetDsOri.logicalPlan match {
       case alias: SubqueryAlias =>
         alias.alias
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 4201097..75870c4 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -24,6 +24,7 @@ import java.time.LocalDateTime
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import com.beust.jcommander.ParameterException
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -39,6 +40,7 @@ import 
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streamer.{CarbonDataStreamer, 
CarbonDataStreamerException}
 
 /**
  * Test Class for carbon merge api
@@ -1423,6 +1425,72 @@ class MergeTestCase extends QueryTest with 
BeforeAndAfterAll {
     
CarbonProperties.getInstance().addProperty("carbon.enable.auto.load.merge", 
"false")
   }
 
+  test("test the validations of configurations for dfs source") {
+    var args = "--record-key-field name --source-ordering-field age 
--source-type dfs"
+    val ex = intercept[ParameterException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    ex.getMessage
+      .equalsIgnoreCase("The following option is required: [--target-table]")
+    args = args.concat(" --target-table test")
+    var exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    assert(exception.getMessage
+      .equalsIgnoreCase(
+        "The DFS source path to read and ingest data onto target carbondata 
table is must in case" +
+        " of DFS source type."))
+
+    args = args.concat(" --dfs-source-input-path /tmp/path 
--schema-provider-type FileSchema")
+    exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    assert(exception.getMessage
+      .equalsIgnoreCase(
+        "Schema file path is must when the schema provider is set as 
FileSchema. Please configure" +
+        " and retry."))
+  }
+
+  test("test validations for kafka source and schema registry") {
+    // default schema provider is schema registry
+    var args = "--target-table test --record-key-field name 
--source-ordering-field age " +
+               "--source-type kafka"
+    var exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    exception.getMessage
+      .equalsIgnoreCase(
+        "Schema registry URL is must when the schema provider is set as 
SchemaRegistry. Please " +
+        "configure and retry.")
+
+    args = args.concat(" --schema-registry-url http://localhost:8081";)
+    exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    exception.getMessage
+      .equalsIgnoreCase(
+        "Kafka topics is must to consume and ingest data onto target 
carbondata table, in case of" +
+        " KAFKA source type.")
+
+    args = args.concat(" --input-kafka-topic person")
+    exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    exception.getMessage
+      .equalsIgnoreCase(
+        "Kafka broker list is must to consume and ingest data onto target 
carbondata table,in " +
+        "case of KAFKA source type.")
+
+    args = args.concat(" --delete-field-value del")
+    exception = intercept[CarbonDataStreamerException] {
+      CarbonDataStreamer.main(args.split(" "))
+    }
+    exception.getMessage
+      .equalsIgnoreCase(
+        "Either both the values of --delete-operation-field and 
--delete-field-value should not " +
+        "be configured or both must be configured. Please configure and 
retry.")
+  }
+
   private def getDeleteDeltaFileCount(tableName: String, segment: String): Int 
= {
     val table = CarbonEnv.getCarbonTable(None, 
tableName)(sqlContext.sparkSession)
     var path = CarbonTablePath
diff --git a/pom.xml b/pom.xml
index da2fb41..f30df4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,10 @@
         <enabled>true</enabled>
       </releases>
     </repository>
+    <repository>
+      <id>confluent</id>
+      <url>https://packages.confluent.io/maven/</url>
+    </repository>
   </repositories>
 
   <dependencyManagement>

Reply via email to