jackylk commented on a change in pull request #3431: [CARBONDATA-3566] Support 
add segment for partition table
URL: https://github.com/apache/carbondata/pull/3431#discussion_r345160280
 
 

 ##########
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 ##########
 @@ -89,27 +92,118 @@ case class CarbonAddLoadCommand(
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", 
"delete segment")
     }
-    val segmentPath = options.getOrElse(
-      "path", throw new UnsupportedOperationException("PATH is manadatory"))
+    var inputPath = options.getOrElse(
+      "path", throw new UnsupportedOperationException("PATH is mandatory"))
 
-    val segSchema = MixedFormatHandler.getSchema(sparkSession, options, 
segmentPath)
-
-    val segCarbonSchema = new Schema(segSchema.fields.map { field =>
+    // infer schema and collect FileStatus for all partitions
+    val (inputPathSchema, lastLevelDirFileMap) =
+      MixedFormatHandler.collectInfo(sparkSession, options, inputPath)
+    var inputPathCarbonFields = inputPathSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
-    })
-
-    val tableCarbonSchema = new Schema(tableSchema.fields.map { field =>
+    }
+    val carbonTableSchema = new Schema(tableSchema.fields.map { field =>
       val dataType = convertSparkToCarbonDataType(field.dataType)
       new Field(field.name, dataType)
     })
 
+    // update schema if has partition
+    val inputPathTableFields = if (carbonTable.isHivePartitionTable) {
+      val partitions = options.getOrElse("partition",
+        throw new AnalysisException(
+          "partition option is required when adding segment to partition 
table")
+      )
+      // extract partition given by user, partition option should be form of 
"a:int, b:string"
+      val partitionFields = partitions.split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(_.toLowerCase)
+        .map { input =>
+          val nameAndDataType = input.split(":")
+          if (nameAndDataType.size == 2) {
+            new Field(nameAndDataType(0), nameAndDataType(1))
+          } else {
+            throw new AnalysisException(s"invalid partition option: 
${options.toString()}")
+          }
+        }
+      // validate against the partition in carbon table
+      val carbonTablePartition = getCarbonTablePartition(sparkSession)
+      if (!partitionFields.sameElements(carbonTablePartition)) {
+        throw new AnalysisException(
+          s"""
+             |Partition is not same. Carbon table partition is :
+             |${carbonTablePartition.mkString(",")} and input segment 
partition is :
+             |${partitionFields.mkString(",")}
+             |""".stripMargin)
+      }
+      inputPathCarbonFields ++ partitionFields
+    } else {
+      if (options.contains("partition")) {
+        throw new AnalysisException(s"partition option is not required for 
non-partition table")
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to