This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 9e9fbf0 ORC-1060: Reduce memory usage when vectorized reading
dictionary string encoding columns (#971)
9e9fbf0 is described below
commit 9e9fbf0aaec44e4f8d66e0aef7407000ee1bce70
Author: expxiaoli <[email protected]>
AuthorDate: Sat Dec 25 12:01:07 2021 +0800
ORC-1060: Reduce memory usage when vectorized reading dictionary string
encoding columns (#971)
### What changes were proposed in this pull request?
In old code, when dictionary string encoding columns are read by vectorized
reading, 2 copy of current stripe's dictionary data and 1 copy of next stripe's
dictionary data are hold in memory when reading across different stripes. That
could make vectorized reading's memory usage is larger than row reading. This
patch fixes this issue, and only hold 1 copy of current stripe's dictionary
data.
This patch logic has 3 parts:
1) Directly read data to primitive byte array, rather than using
DynamicByteArray as intermediate variable. Using DynamicByteArray as
intermediate variable causes 2 copy of current stripe's dictionary data are
hold in memory.
2) Lazy read dictionary data until read current batch data. In previous
code, RecordReaderImpl class's nextBatch method reads dictionary data of next
stripe through advanceToNextRow method, then memory will hold two stripe's
dictionary data. Through lazy read logic, only one stripe's dictionary data is
hold in memory when reading across different stripes.
3) Before lazy read dictionary data from current stripe, remove batch
data's reference to dictionary data from previous stripe. This could allow GC
to clean previous stripe's dictionary data memory.
### Why are the changes needed?
Reduce memory usage.
### How was this patch tested?
Pass the existing CIs.
(cherry picked from commit 3a2cb60e4ab6af6305c351fbdb51b98f460f64a0)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/orc/impl/TreeReaderFactory.java | 76 +++++++++++++---------
1 file changed, 47 insertions(+), 29 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index f1002a1..47ee911 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -2148,12 +2148,15 @@ public class TreeReaderFactory {
*/
public static class StringDictionaryTreeReader extends TreeReader {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
- private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
protected IntegerReader reader;
+ private InStream lengthStream;
+ private InStream dictionaryStream;
+ private OrcProto.ColumnEncoding lengthEncoding;
- private byte[] dictionaryBufferInBytesCache = null;
+ private byte[] dictionaryBuffer = null;
private final LongColumnVector scratchlcv;
+ private boolean initDictionary = false;
StringDictionaryTreeReader(int columnId, Context context) throws
IOException {
this(columnId, null, null, null, null, null, context);
@@ -2167,14 +2170,10 @@ public class TreeReaderFactory {
if (data != null && encoding != null) {
this.reader = createIntegerReader(encoding.getKind(), data, false,
context);
}
-
- if (dictionary != null && encoding != null) {
- readDictionaryStream(dictionary);
- }
-
- if (length != null && encoding != null) {
- readDictionaryLengthStream(length, encoding);
- }
+ lengthStream = length;
+ dictionaryStream = dictionary;
+ lengthEncoding = encoding;
+ initDictionary = false;
}
@Override
@@ -2190,15 +2189,14 @@ public class TreeReaderFactory {
public void startStripe(StripePlanner planner, ReadPhase readPhase) throws
IOException {
super.startStripe(planner, readPhase);
- // read the dictionary blob
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- InStream in = planner.getStream(name);
- readDictionaryStream(in);
+ dictionaryStream = planner.getStream(name);
+ initDictionary = false;
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
- in = planner.getStream(name);
+ InStream in = planner.getStream(name);
OrcProto.ColumnEncoding encoding = planner.getEncoding(columnId);
readDictionaryLengthStream(in, encoding);
@@ -2231,10 +2229,18 @@ public class TreeReaderFactory {
private void readDictionaryStream(InStream in) throws IOException {
if (in != null) { // Guard against empty dictionary stream.
if (in.available() > 0) {
- dictionaryBuffer = new DynamicByteArray(64, in.available());
- dictionaryBuffer.readAll(in);
- // Since its start of strip invalidate the cache.
- dictionaryBufferInBytesCache = null;
+ // remove reference to previous dictionary buffer
+ dictionaryBuffer = null;
+ int dictionaryBufferSize =
dictionaryOffsets[dictionaryOffsets.length - 1];
+ dictionaryBuffer = new byte[dictionaryBufferSize];
+ int pos = 0;
+ int chunkSize = in.available();
+ byte[] chunkBytes = new byte[chunkSize];
+ while (pos < dictionaryBufferSize) {
+ int currentLength = in.read(chunkBytes, 0, chunkSize);
+ System.arraycopy(chunkBytes, 0, dictionaryBuffer, pos,
currentLength);
+ pos += currentLength;
+ }
}
in.close();
} else {
@@ -2261,6 +2267,23 @@ public class TreeReaderFactory {
ReadPhase readPhase) throws IOException {
final BytesColumnVector result = (BytesColumnVector) previousVector;
+ // remove reference to previous dictionary buffer
+ for (int i = 0; i < batchSize; i++) {
+ result.vector[i] = null;
+ }
+
+ // lazy read dictionary buffer,
+ // ensure there is at most one dictionary buffer in memory when reading
cross different file stripes
+ if (!initDictionary) {
+ if (lengthStream != null && lengthEncoding != null) {
+ readDictionaryLengthStream(lengthStream, lengthEncoding);
+ }
+ if (dictionaryStream != null) {
+ readDictionaryStream(dictionaryStream);
+ }
+ initDictionary = true;
+ }
+
// Read present/isNull stream
super.nextVector(result, isNull, batchSize, filterContext, readPhase);
readDictionaryByteArray(result, filterContext, batchSize);
@@ -2274,11 +2297,6 @@ public class TreeReaderFactory {
if (dictionaryBuffer != null) {
- // Load dictionaryBuffer into cache.
- if (dictionaryBufferInBytesCache == null) {
- dictionaryBufferInBytesCache = dictionaryBuffer.get();
- }
-
// Read string offsets
scratchlcv.isRepeating = result.isRepeating;
scratchlcv.noNulls = result.noNulls;
@@ -2291,7 +2309,7 @@ public class TreeReaderFactory {
if (filterContext.isSelectedInUse()) {
// Set all string values to null - offset and length is zero
for (int i = 0; i < batchSize; i++) {
- result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+ result.setRef(i, dictionaryBuffer, 0, 0);
}
// Read selected rows from stream
for (int i = 0; i != filterContext.getSelectedSize(); i++) {
@@ -2299,7 +2317,7 @@ public class TreeReaderFactory {
if (!scratchlcv.isNull[idx]) {
offset = dictionaryOffsets[(int) scratchlcv.vector[idx]];
length = getDictionaryEntryLength((int)
scratchlcv.vector[idx], offset);
- result.setRef(idx, dictionaryBufferInBytesCache, offset,
length);
+ result.setRef(idx, dictionaryBuffer, offset, length);
}
}
} else {
@@ -2307,10 +2325,10 @@ public class TreeReaderFactory {
if (!scratchlcv.isNull[i]) {
offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
length = getDictionaryEntryLength((int) scratchlcv.vector[i],
offset);
- result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+ result.setRef(i, dictionaryBuffer, offset, length);
} else {
// If the value is null then set offset and length to zero
(null string)
- result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+ result.setRef(i, dictionaryBuffer, 0, 0);
}
}
}
@@ -2320,7 +2338,7 @@ public class TreeReaderFactory {
// set all the elements to the same value
offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
length = getDictionaryEntryLength((int) scratchlcv.vector[0],
offset);
- result.setRef(0, dictionaryBufferInBytesCache, offset, length);
+ result.setRef(0, dictionaryBuffer, offset, length);
}
result.isRepeating = scratchlcv.isRepeating;
} else {
@@ -2348,7 +2366,7 @@ public class TreeReaderFactory {
if (entry < dictionaryOffsets.length - 1) {
length = dictionaryOffsets[entry + 1] - offset;
} else {
- length = dictionaryBuffer.size() - offset;
+ length = dictionaryBuffer.length - offset;
}
return length;
}