Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2613#discussion_r208178398
--- Diff:
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
---
@@ -53,153 +39,124 @@
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- private boolean prefetchEnabled;
- private List<Object[]> currentBuffer;
- private List<Object[]> backupBuffer;
- private int currentIdxInBuffer;
- private ExecutorService executorService;
- private Future<Void> fetchFuture;
- private Object[] currentRawRow = null;
- private boolean isBackupFilled = false;
+ /**
+ * Counter to maintain the row counter.
+ */
+ private int counter = 0;
+
+ private Object[] currentConveretedRawRow = null;
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RawResultIterator.class.getName());
+
+ /**
+ * batch of the result.
+ */
+ private RowBatch batch;
public RawResultIterator(CarbonIterator<RowBatch>
detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties
destinationSegProperties,
- boolean isStreamingHandOff) {
+ SegmentProperties sourceSegProperties, SegmentProperties
destinationSegProperties) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
- this.executorService = Executors.newFixedThreadPool(1);
-
- if (!isStreamingHandOff) {
- init();
- }
}
- private void init() {
- this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
-
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
- try {
- new RowsFetcher(false).call();
- if (prefetchEnabled) {
- this.fetchFuture = executorService.submit(new RowsFetcher(true));
- }
- } catch (Exception e) {
- LOGGER.error(e, "Error occurs while fetching records");
- throw new RuntimeException(e);
- }
- }
+ @Override public boolean hasNext() {
- /**
- * fetch rows
- */
- private final class RowsFetcher implements Callable<Void> {
- private boolean isBackupFilling;
-
- private RowsFetcher(boolean isBackupFilling) {
- this.isBackupFilling = isBackupFilling;
- }
-
- @Override
- public Void call() throws Exception {
- if (isBackupFilling) {
- backupBuffer = fetchRows();
- isBackupFilled = true;
+ if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
+ if (detailRawQueryResultIterator.hasNext()) {
+ batch = null;
+ batch = detailRawQueryResultIterator.next();
+ counter = 0; // batch changed so reset the counter.
} else {
- currentBuffer = fetchRows();
+ return false;
}
- return null;
}
- }
- private List<Object[]> fetchRows() {
- if (detailRawQueryResultIterator.hasNext()) {
- return detailRawQueryResultIterator.next().getRows();
+ if (!checkIfBatchIsProcessedCompletely(batch)) {
+ return true;
} else {
- return new ArrayList<>();
+ return false;
}
}
- private void fillDataFromPrefetch() {
- try {
- if (currentIdxInBuffer >= currentBuffer.size() && 0 !=
currentIdxInBuffer) {
- if (prefetchEnabled) {
- if (!isBackupFilled) {
- fetchFuture.get();
- }
- // copy backup buffer to current buffer and fill backup buffer
asyn
- currentIdxInBuffer = 0;
- currentBuffer = backupBuffer;
- isBackupFilled = false;
- fetchFuture = executorService.submit(new RowsFetcher(true));
- } else {
- currentIdxInBuffer = 0;
- new RowsFetcher(false).call();
+ @Override public Object[] next() {
--- End diff --
Move @Override to previous line
---