Repository: carbondata
Updated Branches:
  refs/heads/master c5930527a -> 51b10ba70


[CARBONDATA-3088][Compaction] support prefetch for compaction

Current compaction performance is low. By adding logs to observe the compaction 
procedure, we found that in
`CarbonFactDataHandlerColumnar.addDataToStore(CarbonRow)`, it will wait about 
30ms before submitting a new TablePage producer. Since the method
`addDataToStore` is called in single thread, it will result the waiting every 
32000 records since it will collect 32000 records to form a TablePage.

To reduce the waiting time, we can prepare the 32000 records ahead. This an be 
achived using prefetch.

We will prepare two buffers, one will provide the records to the downstream 
(`addDataToStore`) and the other one will prepare the records
asynchronously. The first is called working buffer and the second is called 
backup buffer. Once working buffer is exhausted, the two buffers
will exchange their roles: the backup buffer will be the new working buffer and 
the old working buffer will be the new backup buffer and it
will be filled asynchronously.

Two parameters are involved for this feature:

1. carbon.detail.batch.size: This is an existed parameter and the default value 
is 100. This parameter controls the batch size of records that
return to the client. For normal query, it is OK to keep it as 100. But for 
compaction, since all the records will be operated, we suggest you
to set it to a larger value such as 32000. (32000 is the max rows for a table 
page that the down stream wants).

2. carbon.compaction.prefetch.enable: This is a new parameter and the default 
value is `false` (We may change it to `true` later). This
parameter controls whether we will prefetch the records for compation.

By using this prefetch feature, we can enhance the performance for compaction. 
More test results can be found in the PR description.

This closes #2906


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/51b10ba7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/51b10ba7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/51b10ba7

Branch: refs/heads/master
Commit: 51b10ba70e53c869d00c4552f8c03134a5f8eb4d
Parents: c593052
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Mon Nov 5 15:11:09 2018 +0800
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Nov 21 10:17:35 2018 +0530

----------------------------------------------------------------------
 .../scan/result/iterator/RawResultIterator.java | 199 ++++++++++++-------
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   2 +-
 .../merger/CarbonCompactionExecutor.java        |   2 +-
 3 files changed, 125 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/51b10ba7/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 29d8751..1febb0b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -16,12 +16,21 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.log4j.Logger;
 
@@ -40,12 +49,14 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
    */
   private CarbonIterator<RowBatch> detailRawQueryResultIterator;
 
-  /**
-   * Counter to maintain the row counter.
-   */
-  private int counter = 0;
-
-  private Object[] currentConveretedRawRow = null;
+  private boolean prefetchEnabled;
+  private List<Object[]> currentBuffer;
+  private List<Object[]> backupBuffer;
+  private int currentIdxInBuffer;
+  private ExecutorService executorService;
+  private Future<Void> fetchFuture;
+  private Object[] currentRawRow = null;
+  private boolean isBackupFilled = false;
 
   /**
    * LOGGER
@@ -53,72 +64,124 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(RawResultIterator.class.getName());
 
-  /**
-   * batch of the result.
-   */
-  private RowBatch batch;
-
   public RawResultIterator(CarbonIterator<RowBatch> 
detailRawQueryResultIterator,
-      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
+      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
+      boolean isStreamingHandoff) {
     this.detailRawQueryResultIterator = detailRawQueryResultIterator;
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
+    this.executorService = Executors.newFixedThreadPool(1);
+
+    if (!isStreamingHandoff) {
+      init();
+    }
   }
 
-  @Override
-  public boolean hasNext() {
-    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
-      if (detailRawQueryResultIterator.hasNext()) {
-        batch = null;
-        batch = detailRawQueryResultIterator.next();
-        counter = 0; // batch changed so reset the counter.
-      } else {
-        return false;
+  private void init() {
+    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+        
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+    try {
+      new RowsFetcher(false).call();
+      if (prefetchEnabled) {
+        this.fetchFuture = executorService.submit(new RowsFetcher(true));
       }
-    }
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      return true;
-    } else {
-      return false;
+    } catch (Exception e) {
+      LOGGER.error("Error occurs while fetching records", e);
+      throw new RuntimeException(e);
     }
   }
 
-  @Override
-  public Object[] next() {
-    if (null == batch) { // for 1st time
-      batch = detailRawQueryResultIterator.next();
+  /**
+   * fetch rows
+   */
+  private final class RowsFetcher implements Callable<Void> {
+    private boolean isBackupFilling;
+
+    private RowsFetcher(boolean isBackupFilling) {
+      this.isBackupFilling = isBackupFilling;
     }
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      try {
-        if (null != currentConveretedRawRow) {
-          counter++;
-          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-          currentConveretedRawRow = null;
-          return currentConveretedRawRowTemp;
-        }
-        return convertRow(batch.getRawRow(counter++));
-      } catch (KeyGenException e) {
-        LOGGER.error(e.getMessage());
-        return null;
+
+    @Override
+    public Void call() throws Exception {
+      if (isBackupFilling) {
+        backupBuffer = fetchRows();
+        isBackupFilled = true;
+      } else {
+        currentBuffer = fetchRows();
       }
-    } else { // completed one batch.
-      batch = null;
-      batch = detailRawQueryResultIterator.next();
-      counter = 0;
+      return null;
     }
+  }
+
+  private List<Object[]> fetchRows() throws Exception {
+    List<Object[]> converted = new ArrayList<>();
+    if (detailRawQueryResultIterator.hasNext()) {
+      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+        converted.add(convertRow(r));
+      }
+    }
+    return converted;
+  }
+
+  private void fillDataFromPrefetch() {
     try {
-      if (null != currentConveretedRawRow) {
-        counter++;
-        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-        currentConveretedRawRow = null;
-        return currentConveretedRawRowTemp;
+      if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
+        if (prefetchEnabled) {
+          if (!isBackupFilled) {
+            fetchFuture.get();
+          }
+          // copy backup buffer to current buffer and fill backup buffer asyn
+          currentIdxInBuffer = 0;
+          currentBuffer.clear();
+          currentBuffer = backupBuffer;
+          isBackupFilled = false;
+          fetchFuture = executorService.submit(new RowsFetcher(true));
+        } else {
+          currentIdxInBuffer = 0;
+          new RowsFetcher(false).call();
+        }
       }
-      return convertRow(batch.getRawRow(counter++));
-    } catch (KeyGenException e) {
-      LOGGER.error(e.getMessage());
-      return null;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * populate a row with index counter increased
+   */
+  private void popRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+    currentIdxInBuffer++;
+  }
+
+  /**
+   * populate a row with index counter unchanged
+   */
+  private void pickRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+  }
+
+  @Override
+  public boolean hasNext() {
+    fillDataFromPrefetch();
+    if (currentIdxInBuffer < currentBuffer.size()) {
+      return true;
     }
 
+    return false;
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      popRow();
+      return this.currentRawRow;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -126,17 +189,8 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
    * @return
    */
   public Object[] fetchConverted() throws KeyGenException {
-    if (null != currentConveretedRawRow) {
-      return currentConveretedRawRow;
-    }
-    if (hasNext()) {
-      Object[] rawRow = batch.getRawRow(counter);
-      currentConveretedRawRow = convertRow(rawRow);
-      return currentConveretedRawRow;
-    }
-    else {
-      return null;
-    }
+    pickRow();
+    return this.currentRawRow;
   }
 
   private Object[] convertRow(Object[] rawRow) throws KeyGenException {
@@ -148,16 +202,9 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
     return rawRow;
   }
 
-  /**
-   * To check if the batch is processed completely
-   * @param batch
-   * @return
-   */
-  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
-    if (counter < batch.getSize()) {
-      return false;
-    } else {
-      return true;
+  public void close() {
+    if (null != executorService) {
+      executorService.shutdownNow();
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51b10ba7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 606aa01..c7c5bdc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -74,7 +74,7 @@ class HandoffPartition(
  */
 class StreamingRawResultIterator(
     recordReader: RecordReader[Void, Any]
-) extends RawResultIterator(null, null, null) {
+) extends RawResultIterator(null, null, null, true) {
 
   override def hasNext: Boolean = {
     recordReader.nextKeyValue()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51b10ba7/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 7bc7ae1..ea123d5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -136,7 +136,7 @@ public class CarbonCompactionExecutor {
         resultList.add(
             new RawResultIterator(executeBlockList(list, segmentId, task, 
configuration),
                 sourceSegProperties,
-                destinationSegProperties));
+                destinationSegProperties, false));
       }
     }
     return resultList;

Reply via email to