Indhumathi27 commented on a change in pull request #4235:
URL: https://github.com/apache/carbondata/pull/4235#discussion_r735270044



##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDStream.scala
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.CarbonSession._
+import org.apache.spark.sql.execution.command.mutation.merge.MergeOperationType
+import org.apache.spark.streaming.dstream.DStream
+
+/**
+ * Wrapper class to hold the spark's DStream object as Dstream can be of 
different types based on
+ * the different input sources like text, avro, kafka etc.
+ * @param sparkSession Spark Session
+ * @param inputDStream Spark's DStream object
+ */
+case class CarbonDStream(
+    @transient sparkSession: SparkSession,

Review comment:
       looks like sparksession is not used anywhere. please remove if not needed

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/streamer/AvroKafkaSource.scala
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+import org.apache.spark.streaming.kafka010.KafkaUtils
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * This class handles of preparing the Dstream and merging the data onto 
target carbondata table
+ * for the kafka Source containing avro data.
+ * @param carbonTable target carbondata table.
+ */
+class AvroKafkaSource(carbonTable: CarbonTable) extends Source with 
Serializable {
+
+  override
+  def getStream(
+      ssc: StreamingContext,
+      sparkSession: SparkSession): CarbonDStream = {
+    // separate out the non carbon properties and prepare the kafka param
+    val kafkaParams = CarbonProperties.getInstance()
+      .getAllPropertiesInstance
+      .asScala
+      .filter { prop => !prop._1.startsWith("carbon") }

Review comment:
       there are some carbon properties which starts with spark keyword, which 
also will be added to kafkaparams using this check. May be you can add new 
kafka-carbon properties preceeding with kafka keyword to identify as kafka 
params

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonDataStreamer.scala
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import com.beust.jcommander.JCommander
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.strategy.CarbonPlanHelper
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Carbondata streamer, which is a spark streaming application to pull data 
from different
+ * sources and merge onto target cabondata table.
+ */
+object CarbonDataStreamer {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def createConfig(streamerConfig: CarbonStreamerConfig,
+      args: Array[String]): Unit = {
+    JCommander.newBuilder().addObject(streamerConfig).build().parse(args: _*)
+  }
+
+  /**
+   * This method creates streaming context for the first time if no checkpoint 
directory present for
+   * the table.
+   *
+   * @param sparkSession          Spark Session.
+   * @param targetCarbonDataTable target carbondata table to merge.
+   * @return Spark StreamingContext
+   */
+  def createStreamingContext(sparkSession: SparkSession,
+      targetCarbonDataTable: CarbonTable): StreamingContext = {
+    val batchDuration = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL,
+        CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL_DEFAULT).toLong
+    val ssc = new StreamingContext(sparkSession.sparkContext, 
Seconds(batchDuration))
+
+    // get the source Dstream based on source type
+    val sourceType = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE,
+        CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE_DEFAULT)
+    val sourceCarbonDStream = SourceFactory.apply(sourceType,
+      ssc,
+      sparkSession,
+      targetCarbonDataTable)
+    // Perform merge on source stream
+    SourceFactory.source.prepareDFAndMerge(sourceCarbonDStream)
+    // set the checkpoint directory for spark streaming
+    
ssc.checkpoint(CarbonTablePath.getStreamingCheckpointDir(targetCarbonDataTable.getTablePath))
+    ssc
+  }
+
+  def main(args: Array[String]): Unit = {
+    // parse the incoming arguments and prepare the configurations
+    val streamerConfigs = new CarbonStreamerConfig()
+    createConfig(streamerConfigs, args)
+    streamerConfigs.setConfigsToCarbonProperty(streamerConfigs)
+
+    val spark = SparkSession
+      .builder()
+      .master(streamerConfigs.sparkMaster)
+      .appName("CarbonData Streamer tool")
+      .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
+      .config("spark.streaming.driver.writeAheadLog.allowBatching", "true")
+      .config("spark.streaming.driver.writeAheadLog.batchingTimeout", 15000)
+      .enableHiveSupport()
+      .getOrCreate()
+    CarbonEnv.getInstance(spark)
+
+    SparkSession.setActiveSession(spark)
+    SparkSession.setDefaultSession(spark)
+
+    val targetTableName = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_TABLE_NAME)
+
+    var databaseName = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_DATABASE_NAME)
+    databaseName = if (databaseName.equalsIgnoreCase("")) {
+      spark.sessionState.catalog.getCurrentDatabase
+    } else {
+      databaseName
+    }
+
+    // if the target table is non-carbondata table, throw exception
+    if (!CarbonPlanHelper.isCarbonTable(TableIdentifier(targetTableName, 
Some(databaseName)))) {
+      throw new UnsupportedOperationException("The merge operation using 
CarbonData Streamer tool" +
+                                              " for non carbondata table is 
not supported.")
+    }
+
+    val targetCarbonDataTable = CarbonEnv.getCarbonTable(Some(databaseName), 
targetTableName)(spark)
+    val dbAndTb = databaseName + CarbonCommonConstants.POINT + targetTableName

Review comment:
       can directly use targetCarbonDataTable.getqualifiedname

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/streamer/CarbonStreamerConfig.scala
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.streamer
+
+import com.beust.jcommander.Parameter
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * The config class to parse the program arguments, validate and prepare the 
required configuration.
+ */
+class CarbonStreamerConfig() extends Serializable {
+
+  @Parameter(names = Array("--master"), description = "Spark master")
+  var sparkMaster: String = "local[2]"
+
+  @Parameter(names = Array("--target-table"),
+    description = "The target carbondata table where the data has to be 
merged. If this is not " +
+                  "configured by user, the operation will fail.",
+    required = true)
+  var targetTableName: String = ""
+
+  @Parameter(names = Array("--database-name"),
+    description = "The database name where the target table is present to 
merge the incoming data" +
+                  ". If not given by user, system will take the current 
database in the spark " +
+                  "session.",
+    required = false)
+  var databaseName: String = ""
+
+  @Parameter(names = Array("--source-type"),
+    description = "Source type to ingest data from. It can be kafka or DFS",
+    required = false)
+  var sourceType: String = 
CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE_DEFAULT
+
+  @Parameter(names = Array("--dfs-source-input-path"),
+    description = "An absolute path on a given file system from where data 
needs to be read to " +
+                  "ingest into the target carbondata table. Mandatory if the 
ingestion source " +
+                  "type is DFS.",
+    required = false)
+  var dfsSourceInputPth: String = ""
+
+  // ----------- kafka properties ----------------------
+  @Parameter(names = Array("--input-kafka-topic"),
+    description = "Kafka topics to consume data from. Mandatory if Kafka is 
selected as the " +
+                  "ingestion source. If multiple topic are present, the varue 
of the property can" +
+                  " be comma separated topic names. If not present in case of 
kafka source, " +
+                  "operation will fail.",
+    required = false)
+  var inputKafkaTopic: String = ""
+
+  @Parameter(names = Array("--brokers"),
+    description = "Kafka brokers to connect to in case Kafka is selected as an 
ingestion source. " +
+                  "If not present in case of kafka source, operation will 
fail.",
+    required = false)
+  var kafkaBrokerList: String = ""
+
+  @Parameter(names = Array("--kafka-initial-offset-type"),
+    description = "Kafka offset to fall back to in case no checkpoint is 
available for starting " +
+                  "ingestion. Valid values - Latest and Earliest.",
+    required = false)
+  var kafkaInitialOffsetType: String = CarbonCommonConstants
+    .CARBON_STREAMER_KAFKA_INITIAL_OFFSET_TYPE_DEFAULT
+
+  @Parameter(names = Array("--key-deserializer"),
+    description = "Key deserializer for kafka. Mandatory for Kafka source.",
+    required = false)
+  var keyDeserializer: String = 
CarbonCommonConstants.KAFKA_KEY_DESERIALIZER_DEFAULT
+
+  @Parameter(names = Array("--value-deserializer"),
+    description = "value deserializer for kafka. Mandatory for Kafka source.",
+    required = false)
+  var valueDeserializer: String = 
CarbonCommonConstants.KAFKA_VALUE_DESERIALIZER_DEFAULT
+
+  @Parameter(names = Array("--schema-registry-url"),
+    description = "Schema registry url, in case schema registry is selected as 
schema provider.",
+    required = false)
+  var schemaRegistryURL: String = ""
+
+  @Parameter(names = Array("--group-id"),
+    description = "This property is required if the consumer uses either the 
group management " +
+                  "functionality by using subscribe(topic) or the Kafka-based 
offset management " +
+                  "strategy.",
+    required = false)
+  var groupId: String = ""
+
+  // -------------------------------------------------------------------- //
+
+  @Parameter(names = Array("--input-payload-format"),
+    description = "Format of the incoming data/payload.",
+    required = false)
+  var inputPayloadFormat: String = CarbonCommonConstants
+    .CARBON_STREAMER_INPUT_PAYLOAD_FORMAT_DEFAULT
+
+  @Parameter(names = Array("--schema-provider-type"),
+    description = "Schema provider for the incoming batch of data. Currently, 
2 types of schema " +
+                  "providers are supported - FileBasedProvider and 
SchemaRegistryProvider",
+    required = false)
+  var schemaProviderType: String = 
CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT
+
+  @Parameter(names = Array("--source-schema-file-path"),
+    description = "Absolute Path to file containing the schema of incoming 
data. Mandatory if " +
+                  "file-based schema provider is selected.",
+    required = false)
+  var sourceSchemaFilePath: String = ""
+
+  @Parameter(names = Array("--merge-operation-type"),
+    description = "Different merge operations are supported - INSERT, UPDATE, 
DELETE, UPSERT",
+    required = false)
+  var mergeOperationType: String = CarbonCommonConstants
+    .CARBON_STREAMER_MERGE_OPERATION_TYPE_DEFAULT
+
+  @Parameter(names = Array("--delete-operation-field"),
+    description = "Name of the field in source schema reflecting the IUD 
operation types on " +
+                  "source data rows.",
+    required = false)
+  var deleteOperationField: String = ""
+
+  @Parameter(names = Array("--delete-field-value"),
+    description = "Name of the field in source schema reflecting the IUD 
operation types on " +
+                  "source data rows.",
+    required = false)
+  var deleteFieldValue: String = ""
+
+  @Parameter(names = Array("--source-ordering-field"),
+    description = "Name of the field from source schema whose value can be 
used for picking the " +
+                  "latest updates for a particular record in the incoming 
batch in case of " +
+                  "duplicates record keys. Useful if the write operation type 
is UPDATE or UPSERT" +
+                  ". This will be used only if 
carbon.streamer.upsert.deduplicate is enabled.",
+    required = true)
+  var sourceOrderingField: String = CarbonCommonConstants
+    .CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT
+
+  @Parameter(names = Array("--record-key-field"),
+    description = "Join key/record key for a particular record. Will be used 
for deduplication of" +
+                  " the incoming batch. If not present operation will fail.",
+    required = true)
+  var keyColumn: String = ""
+
+  @Parameter(names = Array("--deduplicate"),
+    description = "This property specifies if the incoming batch needs to be 
deduplicated in case" +
+                  " of INSERT operation type. If set to true, the incoming 
batch will be " +
+                  "deduplicated against the existing data in the target 
carbondata table.",
+    required = false)
+  var deduplicateEnabled: String = 
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT
+
+  @Parameter(names = Array("--combine-before-upsert"),
+    description = "This property specifies if the incoming batch needs to be 
deduplicated (when " +
+                  "multiple updates for the same record key are present in the 
incoming batch) in" +
+                  " case of UPSERT/UPDATE operation type. If set to true, the 
user needs to " +
+                  "provide proper value for the source ordering field as 
well.",
+    required = false)
+  var isCombineBeforeUpsert: String = CarbonCommonConstants
+    .CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT
+
+  @Parameter(names = Array("--min-batch-interval"),
+    description = "Minimum batch interval time between 2 continuous ingestion 
in continuous mode." +
+                  " Should be specified in seconds.",
+    required = false)
+  var batchInterval: String = 
CarbonCommonConstants.CARBON_STREAMER_BATCH_INTERVAL_DEFAULT
+
+  @Parameter(names = Array("--meta-columns"),
+    description = "Metadata columns added in source dataset. Please mention 
all the metadata" +
+                  " columns as comma separated values which should not be 
written to the " +
+                  "final carbondata table",
+    required = false)
+  var metaColumnsAdded: String = ""
+
+  /**
+   * This method set the configuration to carbonproperties which are passed as 
a arguments while
+   * starting the streamer application
+   */
+  def setConfigsToCarbonProperty(streamerConfig: CarbonStreamerConfig): Unit = 
{
+    val carbonPropertiesInstance = CarbonProperties.getInstance()
+
+    if (streamerConfig.targetTableName.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Target carbondata table is not configured. Please configure and 
retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_TABLE_NAME,
+      streamerConfig.targetTableName)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_DATABASE_NAME,
+      streamerConfig.databaseName)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_TYPE,
+      streamerConfig.sourceType)
+    if (sourceType.equalsIgnoreCase(SourceFactory.DFS.toString) &&
+        dfsSourceInputPth.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "The DFS source path to read and ingest data onto target carbondata 
table is must in case" +
+        " of DFS source type.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_DFS_INPUT_PATH,
+      streamerConfig.dfsSourceInputPth)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_PROVIDER,
+      streamerConfig.schemaProviderType)
+    if (schemaProviderType.equalsIgnoreCase(CarbonCommonConstants
+      .CARBON_STREAMER_SCHEMA_PROVIDER_DEFAULT) &&
+        streamerConfig.schemaRegistryURL.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Schema registry URL is must when the schema provider is set as 
SchemaRegistry. Please " +
+        "configure and retry.")
+    } else if (schemaProviderType.equalsIgnoreCase(CarbonCommonConstants
+      .CARBON_STREAMER_FILE_SCHEMA_PROVIDER) &&
+               streamerConfig.sourceSchemaFilePath.equalsIgnoreCase("")) {
+      throw new CarbonDataStreamerException(
+        "Schema file path is must when the schema provider is set as 
FileSchema. Please " +
+        "configure and retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SCHEMA_REGISTRY_URL,
+      streamerConfig.schemaRegistryURL)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_SOURCE_SCHEMA_PATH,
+      streamerConfig.sourceSchemaFilePath)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_INPUT_PAYLOAD_FORMAT,
+      streamerConfig.inputPayloadFormat)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_MERGE_OPERATION_TYPE,
+      streamerConfig.mergeOperationType)
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants
+      .CARBON_STREAMER_MERGE_OPERATION_FIELD, 
streamerConfig.deleteOperationField)
+    if ((deleteOperationField.isEmpty && deleteFieldValue.nonEmpty) ||
+        (deleteOperationField.nonEmpty && deleteFieldValue.isEmpty)) {
+      throw new CarbonDataStreamerException(
+        "Either both the values of --delete-operation-field and 
--delete-field-value should not " +
+        "be configured or both must be configured. Please configure and 
retry.")
+    }
+    carbonPropertiesInstance.addProperty(CarbonCommonConstants
+      .CARBON_STREAMER_SOURCE_ORDERING_FIELD, 
streamerConfig.sourceOrderingField)
+    if (streamerConfig.keyColumn.isEmpty) {
+      throw new CarbonDataStreamerException(
+        "The key column is must for the merge operation. Please configure and 
retry.")
+    }
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_KEY_FIELD,
+      streamerConfig.keyColumn)
+    
carbonPropertiesInstance.addProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+      streamerConfig.deduplicateEnabled.toString)

Review comment:
        streamerConfig.deduplicateEnabled and 
streamerConfig.isCombineBeforeUpsert are of type STRING only. no need to cast 
it to string




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to