ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] 
Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r279247186
 
 

 ##########
 File path: 
datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 ##########
 @@ -91,29 +115,280 @@ class MVDataMapProvider(
       val queryPlan = SparkSQLUtil.execute(
         sparkSession.sql(updatedQuery).queryExecution.analyzed,
         sparkSession).drop("preAgg")
-      val header = logicalPlan.output.map(_.name).mkString(",")
+      var isOverwriteTable = false
+      val isFullRefresh =
+        if (null != dataMapSchema.getProperties.get("full_refresh")) {
+          dataMapSchema.getProperties.get("full_refresh").toBoolean
+        } else {
+          false
+        }
+      if (isFullRefresh) {
+        isOverwriteTable = true
+      }
+      val dataMapTable = CarbonTable
+        .buildFromTablePath(identifier.getTableName,
+          identifier.getDatabaseName,
+          identifier.getTablePath,
+          identifier.getTableId)
+      // Get new load for datamap by loading specified main table segments
+      val newDataMapLoad = incrementalBuild(isOverwriteTable,
+        dataMapTable.getAbsoluteTableIdentifier,
+        dataMapTable.getMetadataPath)
+      if(newDataMapLoad.isEmpty) {
+        return false
+      }
+      val header = 
dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .filter { column =>
+          !column.getColumnName
+            
.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+        }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
       val loadCommand = CarbonLoadDataCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         factPathFromUser = null,
         dimFilesPath = Seq(),
         options = scala.collection.immutable.Map("fileheader" -> header),
-        isOverwriteTable = true,
+        isOverwriteTable,
         inputSqlString = null,
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
-        internalOptions = Map.empty,
+        internalOptions = Map("mergedSegmentName" -> newDataMapLoad),
         partition = Map.empty)
 
-      SparkSQLUtil.execute(loadCommand, sparkSession)
+      try {
+        SparkSQLUtil.execute(loadCommand, sparkSession)
+      } catch {
+        case ex: Exception =>
+          DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          LOGGER.error("Data Load failed for DataMap: ", ex)
+          return false
+      } finally {
+        unsetMainTableSegments()
+      }
     }
+    true
   }
 
+  /**
+   * This method builds new load for datamap by loading specified segment data 
and returns
+   * new load name for datamap table load
+   */
   @throws[IOException]
-  override def incrementalBuild(
-      segmentIds: Array[String]): Unit = {
-    throw new UnsupportedOperationException
+  override def incrementalBuild(isOverwriteTable: Boolean,
 
 Review comment:
   PLease remove from interface as it is doing nothing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to