Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1755#discussion_r159615305
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
---
@@ -702,63 +702,71 @@ case class CarbonLoadDataCommand(
sparkSession: SparkSession,
table: CarbonTable,
logicalPlan: LogicalPlan): Unit = {
- sparkSession.sessionState.catalog.listPartitions(
+ val existingPartitions =
sparkSession.sessionState.catalog.listPartitions(
TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
- Some(partition.map(f => (f._1, f._2.get))))
- val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet
+ Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
+ val partitionNames = existingPartitions.toList.flatMap { partition =>
+ partition.spec.seq.map{case (column, value) => column + "=" + value}
+ }.toSet
val uniqueId = System.currentTimeMillis().toString
val segments = new SegmentStatusManager(
table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
- try {
- // First drop the partitions from partition mapper files of each
segment
- new CarbonDropPartitionRDD(
- sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- partitionNames.toSeq,
- uniqueId).collect()
- } catch {
- case e: Exception =>
- // roll back the drop partitions from carbon store
- new CarbonDropPartitionCommitRDD(
+ // If any existing partitions need to be overwritten then drop from
partitionmap
+ if (partitionNames.nonEmpty) {
+ try {
+ // First drop the partitions from partition mapper files of each
segment
+ new CarbonDropPartitionRDD(
sparkSession.sparkContext,
table.getTablePath,
segments.asScala,
- false,
+ partitionNames.toSeq,
uniqueId).collect()
- throw e
- }
+ } catch {
+ case e: Exception =>
+ // roll back the drop partitions from carbon store
+ new CarbonDropPartitionCommitRDD(
+ sparkSession.sparkContext,
+ table.getTablePath,
+ segments.asScala,
+ false,
+ uniqueId).collect()
+ throw e
+ }
- try {
+ try {
+ Dataset.ofRows(sparkSession, logicalPlan)
+ } catch {
+ case e: Exception =>
+ // roll back the drop partitions from carbon store
+ new CarbonDropPartitionCommitRDD(
+ sparkSession.sparkContext,
+ table.getTablePath,
+ segments.asScala,
+ false,
+ uniqueId).collect()
+ throw e
+ }
+ // Commit the removed partitions in carbon store.
+ new CarbonDropPartitionCommitRDD(
+ sparkSession.sparkContext,
+ table.getTablePath,
+ segments.asScala,
+ true,
+ uniqueId).collect()
+ // Update the loadstatus with update time to clear cache from driver.
+ val segmentSet = new util.HashSet[String](new
SegmentStatusManager(table
+
.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
--- End diff --
ok
---