Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2613#discussion_r208182587 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -53,153 +39,124 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - private boolean prefetchEnabled; - private List<Object[]> currentBuffer; - private List<Object[]> backupBuffer; - private int currentIdxInBuffer; - private ExecutorService executorService; - private Future<Void> fetchFuture; - private Object[] currentRawRow = null; - private boolean isBackupFilled = false; + /** + * Counter to maintain the row counter. + */ + private int counter = 0; + + private Object[] currentConveretedRawRow = null; + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawResultIterator.class.getName()); + + /** + * batch of the result. + */ + private RowBatch batch; public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, - boolean isStreamingHandOff) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; - this.executorService = Executors.newFixedThreadPool(1); - - if (!isStreamingHandOff) { - init(); - } } - private void init() { - this.prefetchEnabled = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, - CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); - try { - new RowsFetcher(false).call(); - if (prefetchEnabled) { - this.fetchFuture = executorService.submit(new RowsFetcher(true)); - } - } catch (Exception e) { - LOGGER.error(e, "Error occurs while fetching records"); - throw new RuntimeException(e); - } - } + @Override public boolean hasNext() { - /** - * fetch rows - */ - private final class RowsFetcher implements Callable<Void> { - private boolean isBackupFilling; - - private RowsFetcher(boolean isBackupFilling) { - this.isBackupFilling = isBackupFilling; - } - - @Override - public Void call() throws Exception { - if (isBackupFilling) { - backupBuffer = fetchRows(); - isBackupFilled = true; + if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { + if (detailRawQueryResultIterator.hasNext()) { + batch = null; + batch = detailRawQueryResultIterator.next(); + counter = 0; // batch changed so reset the counter. } else { - currentBuffer = fetchRows(); + return false; } - return null; } - } - private List<Object[]> fetchRows() { - if (detailRawQueryResultIterator.hasNext()) { - return detailRawQueryResultIterator.next().getRows(); + if (!checkIfBatchIsProcessedCompletely(batch)) { + return true; } else { - return new ArrayList<>(); + return false; } } - private void fillDataFromPrefetch() { - try { - if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { - if (prefetchEnabled) { - if (!isBackupFilled) { - fetchFuture.get(); - } - // copy backup buffer to current buffer and fill backup buffer asyn - currentIdxInBuffer = 0; - currentBuffer = backupBuffer; - isBackupFilled = false; - fetchFuture = executorService.submit(new RowsFetcher(true)); - } else { - currentIdxInBuffer = 0; - new RowsFetcher(false).call(); + @Override public Object[] next() { --- End diff -- ok
---