pratyakshsharma commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r734955345



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +475,351 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    verifyBackwardsCompatibility(targetDs, srcDs)
+
+    val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = lowerCaseSrcSchemaFields
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      if (additionalSourceFields.nonEmpty) {
+        LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                    s"target schema: ${ additionalSourceFields.mkString(",") 
}")
+      }
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = lowerCaseSrcSchemaFields.groupBy(a => identity(a)).map 
{
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)
+    if (similarFields.nonEmpty) {
+      val errorMsg = s"source schema has similar fields which differ only in 
case sensitivity: " +
+                     s"${ similarFields.mkString(",") }"
+      LOGGER.error(errorMsg)
+      throw new CarbonSchemaException(errorMsg)
+    }
+  }
+
+  /**
+   * This method takes care of handling schema evolution scenarios for 
CarbonStreamer class.
+   * Currently only addition of columns is supported.
+   * @param targetDs target dataset whose schema needs to be modified, if 
applicable
+   * @param srcDs incoming dataset
+   * @param sparkSession SparkSession
+   */
+  def handleSchemaEvolutionForCarbonStreamer(targetDs: Dataset[Row], srcDs: 
Dataset[Row],
+      sparkSession: SparkSession): Unit = {
+    // read the property here
+    val isSchemaEnforcementEnabled = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT,
+        
CarbonCommonConstants.CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT).toBoolean
+    if (isSchemaEnforcementEnabled) {
+      verifySourceAndTargetSchemas(targetDs, srcDs)
+    } else {
+      // These meta columns should be removed before actually writing the data
+      val metaColumnsString = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_STREAMER_META_COLUMNS, "")
+      val metaCols = metaColumnsString.split(",").map(_.trim)
+      val srcDsWithoutMeta = if (metaCols.length > 0) {
+        srcDs.drop(metaCols: _*)
+      } else {
+        srcDs
+      }
+      handleSchemaEvolution(targetDs, srcDsWithoutMeta, sparkSession, 
isStreamerInvolved = true)
+    }
+  }
+
+  def verifyBackwardsCompatibility(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row]): Unit = {
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    targetSchema.fields.foreach(tgtField => {
+      val sourceField = sourceSchema.fields
+        .find(f => f.name.equalsIgnoreCase(tgtField.name.toLowerCase))
+      // check if some field is missing in source schema
+      if (sourceField.isEmpty) {
+        val errorMsg = s"source schema does not contain field: ${ 
tgtField.name }"
+        LOGGER.error(errorMsg)
+        throw new CarbonSchemaException(errorMsg)
+      }
+
+      // check if data type got modified for some column
+      if (!sourceField.get.dataType.equals(tgtField.dataType)) {
+        val errorMsg = s"source schema has different data type " +
+                       s"for field: ${ tgtField.name }"
+        LOGGER.error(errorMsg + s", source type: ${ sourceField.get.dataType 
}, " +
+                     s"target type: ${ tgtField.dataType }")
+        throw new CarbonSchemaException(errorMsg)
+      }
+    })
+  }
+
+  /**
+   * The method takes care of following schema evolution cases:
+   * Addition of a new column in source schema which is not present in target
+   * Deletion of a column in source schema which is present in target
+   * Data type changes for an existing column.
+   * The method does not take care of column renames and table renames
+   * @param targetDs existing target dataset
+   * @param srcDs incoming source dataset
+   * @return new target schema to write the incoming batch with
+   */
+  def handleSchemaEvolution(
+      targetDs: Dataset[Row],
+      srcDs: Dataset[Row],
+      sparkSession: SparkSession,
+      isStreamerInvolved: Boolean = false): Unit = {
+
+    /*
+    If the method is called from CarbonStreamer, we need to ensure the schema 
is evolved in
+    backwards compatible way. In phase 1, only addition of columns is 
supported, hence this check is
+    needed to ensure data integrity.
+    The existing IUD flow supports full schema evolution, hence this check is 
not needed for
+     existing flows.
+     */
+    if (isStreamerInvolved) {
+      verifyBackwardsCompatibility(targetDs, srcDs)
+    }
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    // check if any column got added in source
+    val addedColumns = sourceSchema.fields
+      .filterNot(field => targetSchema.fields.map(_.name).contains(field.name))
+    if (addedColumns.nonEmpty) {
+      handleAddColumnScenario(targetDs,
+        sourceSchema.fields.filter(f => addedColumns.contains(f)).toSeq, 
sparkSession)
+    }
+
+    // check if any column got deleted from source
+    val relations = CarbonSparkUtil.collectCarbonRelation(targetDs.logicalPlan)
+    val targetCarbonTable = relations.head.carbonRelation.carbonTable
+    val partitionInfo = targetCarbonTable.getPartitionInfo
+    val partitionColumns = if (partitionInfo != null) 
partitionInfo.getColumnSchemaList.asScala
+      .map(_.getColumnName).toList else List[String]()
+    val srcSchemaFieldsInLowerCase = 
sourceSchema.fields.map(_.name.toLowerCase)
+    val deletedColumns = targetSchema.fields.map(_.name.toLowerCase)
+      .filterNot(f => {
+        srcSchemaFieldsInLowerCase.contains(f) ||
+        partitionColumns.contains(f)
+      })
+    if (deletedColumns.nonEmpty) {
+      handleDeleteColumnScenario(targetDs, deletedColumns.toList, sparkSession)
+    }
+
+    val modifiedColumns = targetSchema.fields.filter(tgtField => {

Review comment:
       yes, it is possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to