kumarvishal09 commented on a change in pull request #3776:
URL: https://github.com/apache/carbondata/pull/3776#discussion_r449384678



##########
File path: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +318,61 @@ private void commitJobForPartition(JobContext context, 
boolean overwriteSet,
     commitJobFinal(context, loadModel, operationContext, carbonTable, 
uniqueId);
   }
 
+  /**
+   * Method to create and write the segment file, removes the temporary 
directories from all the
+   * respective partition directories. This method is invoked only when {@link
+   * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled.
+   * @param context Job context
+   * @param loadModel Load model
+   * @param segmentFileName Segment file name to write
+   * @param partitionPath Serialized list of partition location
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  private void writeSegmentWithoutMergeIndex(JobContext context, 
CarbonLoadModel loadModel,
+      String segmentFileName, String partitionPath) throws IOException {
+    Map<String, String> indexFileNameMap = (Map<String, String>) 
ObjectSerializationUtil
+        
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+    List<String> partitionList =
+        (List<String>) 
ObjectSerializationUtil.convertStringToObject(partitionPath);
+    SegmentFileStore.SegmentFile finalSegmentFile = null;
+    boolean isRelativePath;
+    String partitionLoc;
+    for (String partition : partitionList) {
+      isRelativePath = false;
+      partitionLoc = partition;
+      if (partitionLoc.startsWith(loadModel.getTablePath())) {
+        partitionLoc = 
partitionLoc.substring(loadModel.getTablePath().length());
+        isRelativePath = true;
+      }
+      SegmentFileStore.SegmentFile segmentFile = new 
SegmentFileStore.SegmentFile();
+      SegmentFileStore.FolderDetails folderDetails = new 
SegmentFileStore.FolderDetails();
+      
folderDetails.setFiles(Collections.singleton(indexFileNameMap.get(partition)));
+      folderDetails.setPartitions(
+          
Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 
1)));
+      folderDetails.setRelative(isRelativePath);
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      segmentFile.getLocationMap().put(partitionLoc, folderDetails);
+      if (finalSegmentFile != null) {
+        finalSegmentFile = finalSegmentFile.merge(segmentFile);
+      } else {
+        finalSegmentFile = segmentFile;
+      }
+    }
+    Objects.requireNonNull(finalSegmentFile);
+    String segmentFilesLocation =

Review comment:
       its better to move this code inside SegmentFileStore itself, pass the 
table path and segment file name and internally it  will handle folder 
creation. Pls check may be its already present 
   String segmentFilesLocation =
           CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath());
       CarbonFile locationFile = 
FileFactory.getCarbonFile(segmentFilesLocation);
       if (!locationFile.exists()) {
         locationFile.mkdirs();
       }
       SegmentFileStore.writeSegmentFile(finalSegmentFile,
           segmentFilesLocation + "/" + segmentFileName + 
CarbonTablePath.SEGMENT_EXT);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to