Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2906#discussion_r231755219
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 ---
    @@ -40,103 +49,150 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       /**
        * LOGGER
        */
       private static final Logger LOGGER =
           LogServiceFactory.getLogService(RawResultIterator.class.getName());
     
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    -
       public RawResultIterator(CarbonIterator<RowBatch> 
detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
    +      boolean isStreamingHandoff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandoff) {
    +      init();
    +    }
       }
     
    -  @Override
    -  public boolean hasNext() {
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error("Error occurs while fetching records", e);
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
    +
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  }
    +
    +  private List<Object[]> fetchRows() throws Exception {
    +    List<Object[]> converted = new ArrayList<>();
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
    +        converted.add(convertRow(r));
    +      }
    +      return converted;
         } else {
    -      return false;
    +      return new ArrayList<>();
    --- End diff --
    
    fine~


---

Reply via email to