ShreelekhyaG commented on a change in pull request #3988: URL: https://github.com/apache/carbondata/pull/3988#discussion_r534677732
########## File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ########## @@ -267,15 +269,20 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, // Merge all partition files into a single file. segmentFileName = mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp - val segmentFile = SegmentFileStore - .mergeSegmentFiles(readPath, - segmentFileName, - CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath)) - if (segmentFile != null) { - SegmentFileStore - .moveFromTempFolder(segmentFile, - carbonLoadModel.getFactTimeStamp + ".tmp", - carbonLoadModel.getTablePath) + if (!isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) { Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala ########## @@ -167,6 +173,21 @@ object CarbonMergeFilesRDD { executorService.submit(new Runnable { override def run(): Unit = { ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) + // If Alter merge index for old tables is triggered, do not delete index files + // immediately to avoid index file not found during concurrent queries + if (readFileFooterFromCarbonDataFile || + !isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) { Review comment: Done ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala ########## @@ -226,85 +233,59 @@ object SecondaryIndexUtil { validSegmentsToUse.toList.asJava, indexCarbonTable) } - mergedSegments.asScala.map { seg => - val file = SegmentFileStore.writeSegmentFile( - indexCarbonTable, - seg.getLoadName, - carbonLoadModel.getFactTimeStamp.toString, - null, - null) - val segment = new Segment(seg.getLoadName, file) - SegmentFileStore.updateTableStatusFile(indexCarbonTable, - seg.getLoadName, - file, - indexCarbonTable.getCarbonTableIdentifier.getTableId, - new SegmentFileStore(tablePath, segment.getSegmentFileName)) - segment - } - - val statusLock = - new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock - try { - val retryCount = CarbonLockUtil.getLockProperty(CarbonCommonConstants - .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT) - val maxTimeout = CarbonLockUtil.getLockProperty(CarbonCommonConstants - .MAX_TIMEOUT_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT) - if (statusLock.lockWithRetries(retryCount, maxTimeout)) { - val endTime = System.currentTimeMillis() - val loadMetadataDetails = SegmentStatusManager - .readLoadMetadata(indexCarbonTable.getMetadataPath) - loadMetadataDetails.foreach(loadMetadataDetail => { - if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) { - loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp) - loadMetadataDetail.setLoadEndTime(endTime) - CarbonLoaderUtil - .addDataIndexSizeIntoMetaEntry(loadMetadataDetail, - loadMetadataDetail.getLoadName, - indexCarbonTable) - } - }) - SegmentStatusManager - .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath), - loadMetadataDetails) - } else { - throw new RuntimeException( - "Not able to acquire the lock for table status updation for table " + databaseName + - "." + indexCarbonTable.getTableName) - } - } finally { - if (statusLock != null) { - statusLock.unlock() - } - } - // clear the indexSchema cache for the merged segments, as the index files and - // data files are rewritten after compaction + val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] = + scala.collection.mutable.Map() if (mergedSegments.size > 0) { - - // merge index files for merged segments - CarbonMergeFilesRDD.mergeIndexFiles(sc.sparkSession, - rebuiltSegments.toSeq, - segmentIdToLoadStartTimeMap, - indexCarbonTable.getTablePath, - indexCarbonTable, mergeIndexProperty = false - ) - - if (CarbonProperties.getInstance() - .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName, - indexCarbonTable.getTableName)) { - try { - IndexServer.getClient - .invalidateSegmentCache(indexCarbonTable, - rebuiltSegments.toArray, - SparkSQLUtil.getTaskGroupId(sc.sparkSession)) - } catch { - case _: Exception => + // merge index files and write segment file for merged segments + mergedSegments.asScala.map { seg => + if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) { + new CarbonIndexFileMergeWriter(indexCarbonTable).mergeCarbonIndexFilesOfSegment(seg + .getLoadName, + tablePath, + false, + carbonLoadModel.getFactTimeStamp.toString) + } + val file = SegmentFileStore.writeSegmentFile( + indexCarbonTable, + seg.getLoadName, + carbonLoadModel.getFactTimeStamp.toString, + null, + null) + segmentToLoadStartTimeMap.put(seg.getLoadName, + carbonLoadModel.getFactTimeStamp) + // clear the indexSchema cache for the merged segments, as the index files and + // data files are rewritten after compaction + if (CarbonProperties.getInstance() + .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName, + indexCarbonTable.getTableName)) { + try { + IndexServer.getClient + .invalidateSegmentCache(indexCarbonTable, + rebuiltSegments.toArray, + SparkSQLUtil.getTaskGroupId(sc.sparkSession)) + } catch { + case _: Exception => + } } + val segment = new Segment(seg.getLoadName, file) + segment + } + if (compactionType == null) { Review comment: Done ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ########## @@ -530,21 +530,22 @@ class CarbonIndexFileMergeTestCase FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath) .listFiles(true, new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { - file.getName.endsWith(extension) Review comment: Done ########## File path: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ########## @@ -1221,6 +1223,20 @@ public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, Stri tempFolderPath, currPartitionSpec); } + public static String mergeIndexFilesInTempSegment(CarbonTable table, String segmentId, + String segmentPath, String uuid) { + try { + return new CarbonIndexFileMergeWriter(table) + .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, segmentId, uuid, + false); + } catch (IOException e) { + String message = + "Failed to merge index files in path: " + segmentPath + ". " + e.getMessage(); Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org