ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677732



##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
##########
@@ -267,15 +269,20 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
         // Merge all partition files into a single file.
         segmentFileName =
           mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
-        val segmentFile = SegmentFileStore
-          .mergeSegmentFiles(readPath,
-            segmentFileName,
-            
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
-        if (segmentFile != null) {
-          SegmentFileStore
-            .moveFromTempFolder(segmentFile,
-              carbonLoadModel.getFactTimeStamp + ".tmp",
-              carbonLoadModel.getTablePath)
+        if (!isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {

Review comment:
       Done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -167,6 +173,21 @@ object CarbonMergeFilesRDD {
             executorService.submit(new Runnable {
               override def run(): Unit = {
                 ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+                // If Alter merge index for old tables is triggered, do not 
delete index files
+                // immediately to avoid index file not found during concurrent 
queries
+                if (readFileFooterFromCarbonDataFile ||
+                    
!isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+                      
CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {

Review comment:
       Done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
##########
@@ -226,85 +233,59 @@ object SecondaryIndexUtil {
               validSegmentsToUse.toList.asJava,
               indexCarbonTable)
           }
-          mergedSegments.asScala.map { seg =>
-            val file = SegmentFileStore.writeSegmentFile(
-              indexCarbonTable,
-              seg.getLoadName,
-              carbonLoadModel.getFactTimeStamp.toString,
-              null,
-              null)
-            val segment = new Segment(seg.getLoadName, file)
-            SegmentFileStore.updateTableStatusFile(indexCarbonTable,
-              seg.getLoadName,
-              file,
-              indexCarbonTable.getCarbonTableIdentifier.getTableId,
-              new SegmentFileStore(tablePath, segment.getSegmentFileName))
-            segment
-          }
-
-          val statusLock =
-            new 
SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
-          try {
-            val retryCount = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-              .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-              
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
-            val maxTimeout = 
CarbonLockUtil.getLockProperty(CarbonCommonConstants
-              .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
-            if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
-              val endTime = System.currentTimeMillis()
-              val loadMetadataDetails = SegmentStatusManager
-                .readLoadMetadata(indexCarbonTable.getMetadataPath)
-              loadMetadataDetails.foreach(loadMetadataDetail => {
-                if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-                  
loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
-                  loadMetadataDetail.setLoadEndTime(endTime)
-                  CarbonLoaderUtil
-                    .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
-                      loadMetadataDetail.getLoadName,
-                      indexCarbonTable)
-                }
-              })
-              SegmentStatusManager
-                
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
-                  loadMetadataDetails)
-            } else {
-              throw new RuntimeException(
-                "Not able to acquire the lock for table status updation for 
table " + databaseName +
-                "." + indexCarbonTable.getTableName)
-            }
-          } finally {
-            if (statusLock != null) {
-              statusLock.unlock()
-            }
-          }
-          // clear the indexSchema cache for the merged segments, as the index 
files and
-          // data files are rewritten after compaction
+          val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, 
java.lang.Long] =
+            scala.collection.mutable.Map()
           if (mergedSegments.size > 0) {
-
-            // merge index files for merged segments
-            CarbonMergeFilesRDD.mergeIndexFiles(sc.sparkSession,
-              rebuiltSegments.toSeq,
-              segmentIdToLoadStartTimeMap,
-              indexCarbonTable.getTablePath,
-              indexCarbonTable, mergeIndexProperty = false
-            )
-
-            if (CarbonProperties.getInstance()
-              .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
-                indexCarbonTable.getTableName)) {
-              try {
-                IndexServer.getClient
-                  .invalidateSegmentCache(indexCarbonTable,
-                    rebuiltSegments.toArray,
-                    SparkSQLUtil.getTaskGroupId(sc.sparkSession))
-              } catch {
-                case _: Exception =>
+            // merge index files and write segment file for merged segments
+            mergedSegments.asScala.map { seg =>
+              if 
(isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+                CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) {
+                new 
CarbonIndexFileMergeWriter(indexCarbonTable).mergeCarbonIndexFilesOfSegment(seg
+                  .getLoadName,
+                  tablePath,
+                  false,
+                  carbonLoadModel.getFactTimeStamp.toString)
+              }
+              val file = SegmentFileStore.writeSegmentFile(
+                indexCarbonTable,
+                seg.getLoadName,
+                carbonLoadModel.getFactTimeStamp.toString,
+                null,
+                null)
+              segmentToLoadStartTimeMap.put(seg.getLoadName,
+                carbonLoadModel.getFactTimeStamp)
+              // clear the indexSchema cache for the merged segments, as the 
index files and
+              // data files are rewritten after compaction
+              if (CarbonProperties.getInstance()
+                .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
+                  indexCarbonTable.getTableName)) {
+                try {
+                  IndexServer.getClient
+                    .invalidateSegmentCache(indexCarbonTable,
+                      rebuiltSegments.toArray,
+                      SparkSQLUtil.getTaskGroupId(sc.sparkSession))
+                } catch {
+                  case _: Exception =>
+                }
               }
+              val segment = new Segment(seg.getLoadName, file)
+              segment
+            }
+            if (compactionType == null) {

Review comment:
       Done

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
##########
@@ -530,21 +530,22 @@ class CarbonIndexFileMergeTestCase
       FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
         .listFiles(true, new CarbonFileFilter {
           override def accept(file: CarbonFile): Boolean = {
-            file.getName.endsWith(extension)

Review comment:
       Done

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
##########
@@ -1221,6 +1223,20 @@ public static String 
mergeIndexFilesInPartitionedSegment(CarbonTable table, Stri
             tempFolderPath, currPartitionSpec);
   }
 
+  public static String mergeIndexFilesInTempSegment(CarbonTable table, String 
segmentId,
+      String segmentPath, String uuid) {
+    try {
+      return new CarbonIndexFileMergeWriter(table)
+          .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, 
segmentId, uuid,
+              false);
+    } catch (IOException e) {
+      String message =
+          "Failed to merge index files in path: " + segmentPath + ". " + 
e.getMessage();

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to