Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2906#discussion_r231755219
--- Diff:
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
---
@@ -40,103 +49,150 @@
*/
private CarbonIterator<RowBatch> detailRawQueryResultIterator;
- /**
- * Counter to maintain the row counter.
- */
- private int counter = 0;
-
- private Object[] currentConveretedRawRow = null;
+ private boolean prefetchEnabled;
+ private List<Object[]> currentBuffer;
+ private List<Object[]> backupBuffer;
+ private int currentIdxInBuffer;
+ private ExecutorService executorService;
+ private Future<Void> fetchFuture;
+ private Object[] currentRawRow = null;
+ private boolean isBackupFilled = false;
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(RawResultIterator.class.getName());
- /**
- * batch of the result.
- */
- private RowBatch batch;
-
public RawResultIterator(CarbonIterator<RowBatch>
detailRawQueryResultIterator,
- SegmentProperties sourceSegProperties, SegmentProperties
destinationSegProperties) {
+ SegmentProperties sourceSegProperties, SegmentProperties
destinationSegProperties,
+ boolean isStreamingHandoff) {
this.detailRawQueryResultIterator = detailRawQueryResultIterator;
this.sourceSegProperties = sourceSegProperties;
this.destinationSegProperties = destinationSegProperties;
+ this.executorService = Executors.newFixedThreadPool(1);
+
+ if (!isStreamingHandoff) {
+ init();
+ }
}
- @Override
- public boolean hasNext() {
- if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
- if (detailRawQueryResultIterator.hasNext()) {
- batch = null;
- batch = detailRawQueryResultIterator.next();
- counter = 0; // batch changed so reset the counter.
+ private void init() {
+ this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+ try {
+ new RowsFetcher(false).call();
+ if (prefetchEnabled) {
+ this.fetchFuture = executorService.submit(new RowsFetcher(true));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error occurs while fetching records", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * fetch rows
+ */
+ private final class RowsFetcher implements Callable<Void> {
+ private boolean isBackupFilling;
+
+ private RowsFetcher(boolean isBackupFilling) {
+ this.isBackupFilling = isBackupFilling;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (isBackupFilling) {
+ backupBuffer = fetchRows();
+ isBackupFilled = true;
} else {
- return false;
+ currentBuffer = fetchRows();
}
+ return null;
}
- if (!checkIfBatchIsProcessedCompletely(batch)) {
- return true;
+ }
+
+ private List<Object[]> fetchRows() throws Exception {
+ List<Object[]> converted = new ArrayList<>();
+ if (detailRawQueryResultIterator.hasNext()) {
+ for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
+ converted.add(convertRow(r));
+ }
+ return converted;
} else {
- return false;
+ return new ArrayList<>();
--- End diff --
fine~
---