ShreelekhyaG commented on a change in pull request #3988:
URL: https://github.com/apache/carbondata/pull/3988#discussion_r584683576



##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -440,6 +393,73 @@ public boolean accept(CarbonFile file) {
     return null;
   }
 
+  /**
+   * Get old and invalid files which have already been merged to a mergeindex 
file.In segment folder
+   * we may have both .index files and .mergeindex files, as we are not 
deleting index files
+   * immediately for old tables, this method reads mergeindex file and adds 
mapped index files to a
+   * list and returns.If more than one mergeindex file is present, considers 
the latest one as valid
+   * Ex: We have 3 files in segment. Segment0/ 1.index , 1.mergeindex file, 
1.carbondata.
+   * 1.index is merged to 1.mergeindex. Here it returns merged index file - 
1.index.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> 
indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore.readMergeFile(indexFile);
+        Map<String, List<String>> carbonMergeFileToIndexFilesMap =

Review comment:
       Done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -290,6 +290,7 @@ public static boolean 
updateTableMetadataStatus(Set<Segment> updatedSegmentsList
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+        boolean isUpdateRequired = false;

Review comment:
       Done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
##########
@@ -185,13 +193,23 @@ object CarbonMergeFilesRDD {
         val readPath: String = 
CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                CarbonCommonConstants.FILE_SEPARATOR + 
segmentId + "_" +
                                segmentFileNameToSegmentIdMap.get(segmentId) + 
".tmp"
+        val uuid = String.valueOf(System.currentTimeMillis)
+        val newSegmentFileName = 
SegmentFileStore.genSegmentFileName(segmentId, uuid)
         // Merge all partition files into a single file.
-        val segmentFileName: String = SegmentFileStore
-          .genSegmentFileName(segmentId, 
segmentFileNameToSegmentIdMap.get(segmentId))
-        SegmentFileStore
+        val segmentFile = SegmentFileStore
           .mergeSegmentFiles(readPath,
-            segmentFileName,
+            newSegmentFileName,
             CarbonTablePath.getSegmentFilesLocation(tablePath))
+        if (segmentFile != null) {
+          val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+            CarbonTablePath.SEGMENT_EXT)
+          val status = SegmentFileStore.updateTableStatusFile(carbonTable, 
segmentId,

Review comment:
       this flow is called when mergeIndex on old tables. Update happens only 
once.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
##########
@@ -140,19 +141,47 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
                   .get
                   .filterNot(streamingSegment.contains(_))
               }
+            validSegments.foreach { segment =>
+              if (segmentsToMerge.contains(segment.getSegmentNo)) {
+                val segmentFile = segment.getSegmentFileName
+                val sfs = new SegmentFileStore(carbonMainTable.getTablePath, 
segmentFile)

Review comment:
       Done.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
##########
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
                   .asScala
                 val validSegmentIds: mutable.Buffer[String] = 
mutable.Buffer[String]()
                 validSegments.foreach { segment =>
-                  validSegmentIds += segment.getSegmentNo
+                  val segmentFile = segment.getSegmentFileName
+                  val sfs = new 
SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)

Review comment:
       Done.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
##########
@@ -129,8 +127,13 @@ private String mergeCarbonIndexFilesOfSegment(String 
segmentId,
     String partitionTempPath = "";
     for (String partition : partitionInfo) {
       if (partitionPath.equalsIgnoreCase(partition)) {
-        partitionTempPath = partition + "/" + tempFolderPath;
-        break;
+        if (tempFolderPath != null) {
+          partitionTempPath = partition + "/" + tempFolderPath;
+          break;
+        } else {
+          fileStore.readAllIIndexOfSegment(partition);

Review comment:
       removed, edited to use `mergeIndexFilesInPartitionedSegment ` flow for 
alter add hive partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to