akashrn5 commented on a change in pull request #3603: [CARBONDATA-3679] 
Optimize local sort performance
URL: https://github.com/apache/carbondata/pull/3603#discussion_r379251111
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
 ##########
 @@ -133,62 +94,74 @@ public void addRow(Object[] row) throws 
CarbonSortKeyAndGroupByException {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("************ Writing to temp file ********** ");
       }
-      intermediateFileMerger.startMergingIfPossible();
       Object[][] recordHolderListLocal = recordHolderList;
-      try {
-        semaphore.acquire();
-        dataSorterAndWriterExecutorService.execute(new 
DataSorterAndWriter(recordHolderListLocal));
-      } catch (InterruptedException e) {
-        LOGGER.error("exception occurred while trying to acquire a semaphore 
lock: ", e);
-        throw new CarbonSortKeyAndGroupByException(e);
-      }
+      handlePreviousPage(recordHolderListLocal);
       // create the new holder Array
       this.recordHolderList = new Object[this.sortBufferSize][];
       this.entryCount = 0;
     }
     recordHolderList[entryCount++] = row;
   }
 
-  /**
-   * This method will be used to add new row
-   *
-   * @param rowBatch new rowBatch
-   * @throws CarbonSortKeyAndGroupByException problem while writing
-   */
   public void addRowBatch(Object[][] rowBatch, int size) throws 
CarbonSortKeyAndGroupByException {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
-    synchronized (addRowsLock) {
-      int sizeLeft = 0;
-      if (entryCount + size >= sortBufferSize) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("************ Writing to temp file ********** ");
-        }
-        intermediateFileMerger.startMergingIfPossible();
-        Object[][] recordHolderListLocal = recordHolderList;
-        sizeLeft = sortBufferSize - entryCount;
-        if (sizeLeft > 0) {
-          System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, 
sizeLeft);
-        }
-        try {
-          semaphore.acquire();
-          dataSorterAndWriterExecutorService
-              .execute(new DataSorterAndWriter(recordHolderListLocal));
-        } catch (Exception e) {
-          LOGGER.error(
-              "exception occurred while trying to acquire a semaphore lock: " 
+ e.getMessage(), e);
-          throw new CarbonSortKeyAndGroupByException(e);
-        }
-        // create the new holder Array
-        this.recordHolderList = new Object[this.sortBufferSize][];
-        this.entryCount = 0;
-        size = size - sizeLeft;
-        if (size == 0) {
-          return;
-        }
+    int sizeLeft = 0;
+    if (entryCount + size >= sortBufferSize) {
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("************ Writing to temp file ********** ");
       }
-      System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
-      entryCount += size;
+      Object[][] recordHolderListLocal = recordHolderList;
+      sizeLeft = sortBufferSize - entryCount;
+      if (sizeLeft > 0) {
+        System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, 
sizeLeft);
+      }
+      handlePreviousPage(recordHolderListLocal);
+      // create the new holder Array
+      this.recordHolderList = new Object[this.sortBufferSize][];
+      this.entryCount = 0;
+      size = size - sizeLeft;
+      if (size == 0) {
+        return;
+      }
+    }
+    System.arraycopy(rowBatch, sizeLeft, recordHolderList, entryCount, size);
+    entryCount += size;
+  }
+
+  /**
+   * sort and write data
+   * @param recordHolderArray
+   */
+  private void handlePreviousPage(Object[][] recordHolderArray)
+          throws CarbonSortKeyAndGroupByException {
+    try {
+      long startTime = System.currentTimeMillis();
+      if (parameters.getNumberOfNoDictSortColumns() > 0) {
+        Arrays.sort(recordHolderArray,
+                new NewRowComparator(parameters.getNoDictionarySortColumn(),
+                        parameters.getNoDictDataType()));
+      } else {
+        Arrays.sort(recordHolderArray,
+                new 
NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
+      }
+
+      // create a new file and choose folder randomly every time
+      String[] tmpFileLocation = parameters.getTempFileLocation();
+      String locationChosen = tmpFileLocation[new 
Random().nextInt(tmpFileLocation.length)];
+      File sortTempFile = new File(
+              locationChosen + File.separator + parameters.getTableName()
+                      + '_' + parameters.getRangeId() + '_' + System.nanoTime()
+                      + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
+      writeDataToFile(recordHolderArray, recordHolderArray.length, 
sortTempFile);
+      // add sort temp filename to and arrayList. When the list size reaches 
20 then
 
 Review comment:
   yes, i already compared old code your code, but better to improve code when 
we do refactoring, my suggestion actually

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to