Repository: carbondata Updated Branches: refs/heads/master e36257fd2 -> 95b9208aa
[CARBONDATA-1837] Reusing origin row to reduce memory consumption In data converting process of data loading, Carbondata will convert each row to another by batch. Currently, it will create a new batch to store the converted rows, which I think can be optimized to reuse the old row batch's space, thus will reduce memory consumption and GC related overhead. This closes #1593 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/95b9208a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/95b9208a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/95b9208a Branch: refs/heads/master Commit: 95b9208aa2610991842117796255bb51de1fb762 Parents: e36257f Author: xuchuanyin <[email protected]> Authored: Wed Nov 29 11:59:48 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 6 23:12:03 2017 +0800 ---------------------------------------------------------------------- .../processing/loading/row/CarbonRowBatch.java | 19 +++++++++++++++++++ .../steps/DataConverterProcessorStepImpl.java | 10 ++++++---- ...aConverterProcessorWithBucketingStepImpl.java | 15 ++++++++------- 3 files changed, 33 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/95b9208a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java index f5f112c..e819dcd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java @@ -61,4 +61,23 @@ public class CarbonRowBatch extends CarbonIterator<CarbonRow> { @Override public void remove() { } + + /** + * set previous row, this can be used to set value for the RowBatch after iterating it. The + * `index` here is `index-1` because after we iterate this value, the `index` has increased by 1. + * @param row row + */ + public void setPreviousRow(CarbonRow row) { + if (index == 0) { + throw new RuntimeException("Unable to set a row in RowBatch before index 0"); + } + rowBatch[index - 1] = row; + } + + /** + * rewind to the head, this can be used for reuse the origin batch instead of generating a new one + */ + public void rewind() { + index = 0; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/95b9208a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index 1e73867..a0592f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -106,12 +106,14 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte * @return processed row. */ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) { - CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize()); while (rowBatch.hasNext()) { - newBatch.addRow(localConverter.convert(rowBatch.next())); + CarbonRow convertRow = localConverter.convert(rowBatch.next()); + rowBatch.setPreviousRow(convertRow); } - rowCounter.getAndAdd(newBatch.getSize()); - return newBatch; + rowCounter.getAndAdd(rowBatch.getSize()); + // reuse the origin batch + rowBatch.rewind(); + return rowBatch; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/95b9208a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java index 009c6a0..82112b7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -128,16 +128,17 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa * @return processed row. */ protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) { - CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize()); while (rowBatch.hasNext()) { - CarbonRow next = rowBatch.next(); - short bucketNumber = (short) partitioner.getPartition(next.getData()); - CarbonRow convertRow = localConverter.convert(next); + CarbonRow row = rowBatch.next(); + short bucketNumber = (short) partitioner.getPartition(row.getData()); + CarbonRow convertRow = localConverter.convert(row); convertRow.bucketNumber = bucketNumber; - newBatch.addRow(convertRow); + rowBatch.setPreviousRow(convertRow); } - rowCounter.getAndAdd(newBatch.getSize()); - return newBatch; + rowCounter.getAndAdd(rowBatch.getSize()); + // reuse the origin batch + rowBatch.rewind(); + return rowBatch; } @Override
