kunal642 commented on a change in pull request #4227:
URL: https://github.com/apache/carbondata/pull/4227#discussion_r734935130



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetUtil.scala
##########
@@ -462,4 +475,351 @@ object CarbonMergeDataSetUtil {
       columnMinMaxInBlocklet.asScala
     }
   }
+
+  /**
+   * This method verifies source and target schemas for the following:
+   * If additional columns are present in source schema as compared to target, 
simply ignore them.
+   * If some columns are missing in source schema as compared to target 
schema, exception is thrown.
+   * If data type of some column differs in source and target schemas, 
exception is thrown.
+   * If source schema has multiple columns whose names differ only in case 
sensitivity, exception
+   * is thrown.
+   * @param targetDs target carbondata table
+   * @param srcDs source/incoming data
+   */
+  def verifySourceAndTargetSchemas(targetDs: Dataset[Row], srcDs: 
Dataset[Row]): Unit = {
+    LOGGER.info("schema enforcement is enabled. Source and target schemas will 
be verified")
+    // get the source and target dataset schema
+    val sourceSchema = srcDs.schema
+    val targetSchema = targetDs.schema
+
+    verifyBackwardsCompatibility(targetDs, srcDs)
+
+    val lowerCaseSrcSchemaFields = sourceSchema.fields.map(_.name.toLowerCase)
+    // check if some additional column got added in source schema
+    if (sourceSchema.fields.length > targetSchema.fields.length) {
+      val additionalSourceFields = lowerCaseSrcSchemaFields
+        .filterNot(srcField => {
+          targetSchema.fields.map(_.name.toLowerCase).contains(srcField)
+        })
+      if (additionalSourceFields.nonEmpty) {
+        LOGGER.warn(s"source schema contains additional fields which are not 
present in " +
+                    s"target schema: ${ additionalSourceFields.mkString(",") 
}")
+      }
+    }
+
+    // check if source schema has fields whose names only differ in case 
sensitivity
+    val similarFields = lowerCaseSrcSchemaFields.groupBy(a => identity(a)).map 
{
+      case (str, times) => (str, times.length)
+    }.toList.filter(e => e._2 > 1).map(_._1)

Review comment:
       try to combine the multiple map, filters to a single collect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to