Indhumathi27 commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r736168790



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +475,365 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    verifyBackwardsCompatibility(targetDs, srcDs)
+
+    val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val tgtSchemaInLowerCase = targetSchema.fields.map(_.name.toLowerCase)
+      val additionalSourceFields = lowerCaseSrcSchemaFields
+        .filterNot(srcField => {
+          tgtSchemaInLowerCase.contains(srcField)
+        })
+      if (additionalSourceFields.nonEmpty) {
+        LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                    s"target schema: ${ additionalSourceFields.mkString(",") 
}")
+      }
+    }
+  }
+
+  def verifyCaseSensitiveFieldNames(
+      lowerCaseSrcSchemaFields: Array[String]
+  ): Unit = {
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = lowerCaseSrcSchemaFields.groupBy(a => identity(a)).map 
{
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      val errorMsg = s"source schema has similar fields which differ only in 
case sensitivity: " +
+                     s"${ similarFields.mkString(",") }"
+      LOGGER.error(errorMsg)
+      throw new CarbonSchemaException(errorMsg)
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) {
+        srcDs.drop(metaCols: _*)
+      } else {
+        srcDs
+      }
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name))
+      // check if some field is missing in source schema
+      if (sourceField.isEmpty) {
+        val errorMsg = s"source schema does not contain field: ${ 
tgtField.name }"
+        LOGGER.error(errorMsg)
+        throw new CarbonSchemaException(errorMsg)
+      }
+
+      // check if data type got modified for some column
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        val errorMsg = s"source schema has different data type " +
+                       s"for field: ${ tgtField.name }"
+        LOGGER.error(errorMsg + s", source type: ${ sourceField.get.dataType 
}, " +
+                     s"target type: ${ tgtField.dataType }")
+        throw new CarbonSchemaException(errorMsg)
+      }
+    })
+
+    val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+    verifyCaseSensitiveFieldNames(lowerCaseSrcSchemaFields)
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    /*
+    If the method is called from CarbonStreamer, we need to ensure the schema 
is evolved in
+    backwards compatible way. In phase 1, only addition of columns is 
supported, hence this check is
+    needed to ensure data integrity.
+    The existing IUD flow supports full schema evolution, hence this check is 
not needed for
+     existing flows.
+     */
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .filterNot(field => targetSchema.fields.map(_.name).contains(field.name))
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        sourceSchema.fields.filter(f => addedColumns.contains(f)).toSeq,
+        sparkSession,
+        targetCarbonTable)
+    }
+
+    // check if any column got deleted from source
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) {
+      partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList
+    } else {
+      List[String]()
+    }
+    val srcSchemaFieldsInLowerCase = 
sourceSchema.fields.map(_.name.toLowerCase)
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        srcSchemaFieldsInLowerCase.contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, 
sparkSession, targetCarbonTable)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {
+      val sourceField = sourceSchema.fields.find(f => 
f.name.equalsIgnoreCase(tgtField.name))
+      if (sourceField.isDefined) 
!sourceField.get.dataType.equals(tgtField.dataType) else false
+    })
+
+    if (modifiedColumns.nonEmpty) {
+      handleDataTypeChangeScenario(targetDs,
+        modifiedColumns.toList,
+        sparkSession,
+        targetCarbonTable)
+    }
+  }
+
+  /**
+   * This method calls CarbonAlterTableAddColumnCommand for adding new columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToAdd new columns to be added
+   * @param sparkSession SparkSession
+   */
+  def handleAddColumnScenario(targetDs: Dataset[Row], colsToAdd: 
Seq[StructField],
+      sparkSession: SparkSession,
+      targetCarbonTable: CarbonTable): Unit = {
+    val alterTableAddColsCmd = DDLHelper.prepareAlterTableAddColsCommand(
+      Option(targetCarbonTable.getDatabaseName),
+      colsToAdd,
+      targetCarbonTable.getTableName.toLowerCase)
+    alterTableAddColsCmd.run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableDropColumnCommand for deleting columns
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param colsToDrop columns to be dropped from carbondata table
+   * @param sparkSession SparkSession
+   */
+  def handleDeleteColumnScenario(targetDs: Dataset[Row], colsToDrop: 
List[String],
+      sparkSession: SparkSession,
+      targetCarbonTable: CarbonTable): Unit = {
+    val alterTableDropColumnModel = AlterTableDropColumnModel(
+      
CarbonParserUtil.convertDbNameToLowerCase(Option(targetCarbonTable.getDatabaseName)),
+      targetCarbonTable.getTableName.toLowerCase,
+      colsToDrop.map(_.toLowerCase))
+    
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel).run(sparkSession)
+  }
+
+  /**
+   * This method calls CarbonAlterTableColRenameDataTypeChangeCommand for 
handling data type changes
+   * @param targetDs target dataset whose schema needs to be modified
+   * @param modifiedCols columns with data type changes
+   * @param sparkSession SparkSession
+   */
+  def handleDataTypeChangeScenario(targetDs: Dataset[Row], modifiedCols: 
List[StructField],
+      sparkSession: SparkSession,
+      targetCarbonTable: CarbonTable): Unit = {
+    // need to call the command one by one for each modified column
+    modifiedCols.foreach(col => {
+      val alterTableColRenameDataTypeChangeCommand = DDLHelper
+        .prepareAlterTableColRenameDataTypeChangeCommand(
+        col,
+        Option(targetCarbonTable.getDatabaseName.toLowerCase),
+        targetCarbonTable.getTableName.toLowerCase,
+        col.name.toLowerCase,
+        isColumnRename = false,
+        Option.empty)
+      alterTableColRenameDataTypeChangeCommand.run(sparkSession)
+    })
+  }
+
+  def deduplicateBeforeWriting(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      targetCarbonTable: CarbonTable): Dataset[Row] = {
+    val properties = CarbonProperties.getInstance()
+    val filterDupes = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_INSERT_DEDUPLICATE_DEFAULT).toBoolean
+    val combineBeforeUpsert = properties
+      .getProperty(CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE,
+        
CarbonCommonConstants.CARBON_STREAMER_UPSERT_DEDUPLICATE_DEFAULT).toBoolean
+    var dedupedDataset: Dataset[Row] = srcDs
+    if (combineBeforeUpsert) {
+      dedupedDataset = deduplicateAgainstIncomingDataset(srcDs, sparkSession, 
srcAlias, keyColumn,
+        orderingField, targetCarbonTable)
+    }
+    if (filterDupes) {
+      dedupedDataset = deduplicateAgainstExistingDataset(dedupedDataset, 
targetDs,
+        srcAlias, targetAlias, keyColumn)
+    }
+    dedupedDataset
+  }
+
+  def deduplicateAgainstIncomingDataset(
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      srcAlias: String,
+      keyColumn: String,
+      orderingField: String,
+      table: CarbonTable): Dataset[Row] = {
+    if 
(orderingField.equals(CarbonCommonConstants.CARBON_STREAMER_SOURCE_ORDERING_FIELD_DEFAULT))
 {
+      return srcDs
+    }
+    val schema = srcDs.schema
+    val carbonKeyColumn = table.getColumnByName(keyColumn)
+    val keyColumnDataType = getCarbonDataType(keyColumn, srcDs)
+    val orderingFieldDataType = getCarbonDataType(orderingField, srcDs)
+    val isPrimitiveAndNotDate = 
DataTypeUtil.isPrimitiveColumn(orderingFieldDataType) &&
+                                (orderingFieldDataType != DataTypes.DATE)
+    val comparator = Comparator.getComparator(orderingFieldDataType)
+    val rdd = srcDs.rdd
+    val dedupedRDD: RDD[Row] = rdd.map { row =>
+      val index = row.fieldIndex(keyColumn)
+      val rowKey = getRowKey(row, index, carbonKeyColumn, 
isPrimitiveAndNotDate, keyColumnDataType)
+      (rowKey, row)
+    }.reduceByKey{(row1, row2) =>
+      var orderingValue1 = row1.getAs(orderingField).asInstanceOf[Any]
+      var orderingValue2 = row2.getAs(orderingField).asInstanceOf[Any]
+      if (orderingValue1 == null) {
+        row2
+      } else if (orderingValue2 == null) {
+        row1
+      } else {
+        if (orderingFieldDataType.equals(DataTypes.STRING)) {
+          orderingValue1 = orderingValue1.toString
+            .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+          orderingValue2 = orderingValue2.toString
+            .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+        }
+        if (comparator.compare(orderingValue1, orderingValue2) >= 0) {
+          row1
+        } else {
+          row2
+        }
+      }
+    }.map(_._2)
+    sparkSession.createDataFrame(dedupedRDD, schema).alias(srcAlias)
+  }
+
+  def getRowKey(
+      row: Row,
+      index: Integer,
+      carbonKeyColumn: CarbonColumn,
+      isPrimitiveAndNotDate: Boolean,
+      keyColumnDataType: CarbonDataType
+  ): AnyRef = {
+    if (!row.isNullAt(index)) {
+      row.getAs(index).toString
+    } else {
+      val value: Long = 0
+      if (carbonKeyColumn.isDimension) {
+        if (isPrimitiveAndNotDate) {
+          CarbonCommonConstants.EMPTY_BYTE_ARRAY
+        } else {
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY
+        }
+      } else {
+        val nullValueForMeasure = keyColumnDataType match {
+          case DataTypes.BOOLEAN | DataTypes.BYTE => value.toByte
+          case DataTypes.SHORT => value.toShort
+          case DataTypes.INT => value.toInt
+          case DataTypes.DOUBLE => 0d
+          case DataTypes.FLOAT => 0f
+          case DataTypes.LONG | DataTypes.TIMESTAMP => value
+          case _ => value
+        }
+        CarbonUtil.getValueAsBytes(keyColumnDataType, nullValueForMeasure)
+      }
+    }
+  }
+
+  def getCarbonDataType(
+      fieldName: String,
+      srcDs: Dataset[Row]
+  ): CarbonDataType = {
+    val schema = srcDs.schema
+    val dataType = schema.fields.find(f => 
f.name.equalsIgnoreCase(fieldName)).get.dataType
+    CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)
+  }
+
+  def deduplicateAgainstExistingDataset(
+      srcDs: Dataset[Row],
+      targetDs: Dataset[Row],
+      srcAlias: String,
+      targetAlias: String,
+      keyColumn: String
+  ) : Dataset[Row] = {

Review comment:
       looks like some places, carbon data code style format is followed. 
Please check and format the newly added code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to