ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] 
Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r279248051
 
 

 ##########
 File path: 
datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 ##########
 @@ -91,29 +115,280 @@ class MVDataMapProvider(
       val queryPlan = SparkSQLUtil.execute(
         sparkSession.sql(updatedQuery).queryExecution.analyzed,
         sparkSession).drop("preAgg")
-      val header = logicalPlan.output.map(_.name).mkString(",")
+      var isOverwriteTable = false
+      val isFullRefresh =
+        if (null != dataMapSchema.getProperties.get("full_refresh")) {
+          dataMapSchema.getProperties.get("full_refresh").toBoolean
+        } else {
+          false
+        }
+      if (isFullRefresh) {
+        isOverwriteTable = true
+      }
+      val dataMapTable = CarbonTable
+        .buildFromTablePath(identifier.getTableName,
+          identifier.getDatabaseName,
+          identifier.getTablePath,
+          identifier.getTableId)
+      // Get new load for datamap by loading specified main table segments
+      val newDataMapLoad = incrementalBuild(isOverwriteTable,
+        dataMapTable.getAbsoluteTableIdentifier,
+        dataMapTable.getMetadataPath)
+      if(newDataMapLoad.isEmpty) {
+        return false
+      }
+      val header = 
dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .filter { column =>
+          !column.getColumnName
+            
.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+        }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
       val loadCommand = CarbonLoadDataCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         factPathFromUser = null,
         dimFilesPath = Seq(),
         options = scala.collection.immutable.Map("fileheader" -> header),
-        isOverwriteTable = true,
+        isOverwriteTable,
         inputSqlString = null,
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
-        internalOptions = Map.empty,
+        internalOptions = Map("mergedSegmentName" -> newDataMapLoad),
         partition = Map.empty)
 
-      SparkSQLUtil.execute(loadCommand, sparkSession)
+      try {
+        SparkSQLUtil.execute(loadCommand, sparkSession)
+      } catch {
+        case ex: Exception =>
+          DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          LOGGER.error("Data Load failed for DataMap: ", ex)
+          return false
+      } finally {
+        unsetMainTableSegments()
+      }
     }
+    true
   }
 
+  /**
+   * This method builds new load for datamap by loading specified segment data 
and returns
+   * new load name for datamap table load
+   */
   @throws[IOException]
-  override def incrementalBuild(
-      segmentIds: Array[String]): Unit = {
-    throw new UnsupportedOperationException
+  override def incrementalBuild(isOverwriteTable: Boolean,
+      dataMapTableAbsoluteTableIdentifier: AbsoluteTableIdentifier,
+      dataMapTableMetadataPath: String): String = {
+    var loadMetaDataDetails = 
SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath)
+    var newLoadName : String = ""
+    var segmentMap: String = ""
+    val segmentStatusManager = new 
SegmentStatusManager(dataMapTableAbsoluteTableIdentifier)
+    // Acquire table status lock to handle concurrent dataloading
+    val carbonLock: ICarbonLock = segmentStatusManager.getTableStatusLock
 
 Review comment:
   Please move it to the parent class and add abstract method 
rebuildInteral(String loadName, Map <String, Set<segments >>)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to