Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2613#discussion_r208182587
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 ---
    @@ -53,153 +39,124 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  private boolean prefetchEnabled;
    -  private List<Object[]> currentBuffer;
    -  private List<Object[]> backupBuffer;
    -  private int currentIdxInBuffer;
    -  private ExecutorService executorService;
    -  private Future<Void> fetchFuture;
    -  private Object[] currentRawRow = null;
    -  private boolean isBackupFilled = false;
    +  /**
    +   * Counter to maintain the row counter.
    +   */
    +  private int counter = 0;
    +
    +  private Object[] currentConveretedRawRow = null;
    +
    +  /**
    +   * LOGGER
    +   */
    +  private static final LogService LOGGER =
    +      LogServiceFactory.getLogService(RawResultIterator.class.getName());
    +
    +  /**
    +   * batch of the result.
    +   */
    +  private RowBatch batch;
     
       public RawResultIterator(CarbonIterator<RowBatch> 
detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
    -      boolean isStreamingHandOff) {
    +      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    -    this.executorService = Executors.newFixedThreadPool(1);
    -
    -    if (!isStreamingHandOff) {
    -      init();
    -    }
       }
     
    -  private void init() {
    -    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    -        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    -        
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    -    try {
    -      new RowsFetcher(false).call();
    -      if (prefetchEnabled) {
    -        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    -      }
    -    } catch (Exception e) {
    -      LOGGER.error(e, "Error occurs while fetching records");
    -      throw new RuntimeException(e);
    -    }
    -  }
    +  @Override public boolean hasNext() {
     
    -  /**
    -   * fetch rows
    -   */
    -  private final class RowsFetcher implements Callable<Void> {
    -    private boolean isBackupFilling;
    -
    -    private RowsFetcher(boolean isBackupFilling) {
    -      this.isBackupFilling = isBackupFilling;
    -    }
    -
    -    @Override
    -    public Void call() throws Exception {
    -      if (isBackupFilling) {
    -        backupBuffer = fetchRows();
    -        isBackupFilled = true;
    +    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    +      if (detailRawQueryResultIterator.hasNext()) {
    +        batch = null;
    +        batch = detailRawQueryResultIterator.next();
    +        counter = 0; // batch changed so reset the counter.
           } else {
    -        currentBuffer = fetchRows();
    +        return false;
           }
    -      return null;
         }
    -  }
     
    -  private List<Object[]> fetchRows() {
    -    if (detailRawQueryResultIterator.hasNext()) {
    -      return detailRawQueryResultIterator.next().getRows();
    +    if (!checkIfBatchIsProcessedCompletely(batch)) {
    +      return true;
         } else {
    -      return new ArrayList<>();
    +      return false;
         }
       }
     
    -  private void fillDataFromPrefetch() {
    -    try {
    -      if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
    -        if (prefetchEnabled) {
    -          if (!isBackupFilled) {
    -            fetchFuture.get();
    -          }
    -          // copy backup buffer to current buffer and fill backup buffer 
asyn
    -          currentIdxInBuffer = 0;
    -          currentBuffer = backupBuffer;
    -          isBackupFilled = false;
    -          fetchFuture = executorService.submit(new RowsFetcher(true));
    -        } else {
    -          currentIdxInBuffer = 0;
    -          new RowsFetcher(false).call();
    +  @Override public Object[] next() {
    --- End diff --
    
    ok


---

Reply via email to