VenuReddy2103 commented on a change in pull request #3776:
URL: https://github.com/apache/carbondata/pull/3776#discussion_r451275614



##########
File path: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, 
boolean overwriteSet,
     commitJobFinal(context, loadModel, operationContext, carbonTable, 
uniqueId);
   }
 
+  /**
+   * Method to create and write the segment file, removes the temporary 
directories from all the
+   * respective partition directories. This method is invoked only when {@link
+   * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled.
+   * @param context Job context
+   * @param loadModel Load model
+   * @param segmentFileName Segment file name to write
+   * @param partitionPath Serialized list of partition location
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  private void writeSegmentWithoutMergeIndex(JobContext context, 
CarbonLoadModel loadModel,
+      String segmentFileName, String partitionPath) throws IOException {
+    Map<String, String> indexFileNameMap = (Map<String, String>) 
ObjectSerializationUtil
+        
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+    List<String> partitionList =
+        (List<String>) 
ObjectSerializationUtil.convertStringToObject(partitionPath);
+    SegmentFileStore.SegmentFile finalSegmentFile = null;
+    boolean isRelativePath;
+    String partitionLoc;
+    for (String partition : partitionList) {
+      isRelativePath = false;
+      partitionLoc = partition;
+      if (partitionLoc.startsWith(loadModel.getTablePath())) {
+        partitionLoc = 
partitionLoc.substring(loadModel.getTablePath().length());
+        isRelativePath = true;
+      }
+      SegmentFileStore.SegmentFile segmentFile = new 
SegmentFileStore.SegmentFile();
+      SegmentFileStore.FolderDetails folderDetails = new 
SegmentFileStore.FolderDetails();
+      
folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition)));
+      folderDetails.setPartitions(
+          
Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 
1)));
+      folderDetails.setRelative(isRelativePath);
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      segmentFile.getLocationMap().put(partitionLoc, folderDetails);
+      if (finalSegmentFile != null) {
+        finalSegmentFile = finalSegmentFile.merge(segmentFile);
+      } else {
+        finalSegmentFile = segmentFile;
+      }
+    }
+    Objects.requireNonNull(finalSegmentFile);
+    String segmentFilesLocation =

Review comment:
       Agreed. Moved this code to SegmentFileStore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to