HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f65e36d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f65e36d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f65e36d Branch: refs/heads/master Commit: 7f65e36d9aa6ce9af8bf799fe07e0e0e7d749a0e Parents: 255b2bd Author: Owen O'Malley <[email protected]> Authored: Wed Nov 11 15:24:03 2015 -0800 Committer: Owen O'Malley <[email protected]> Committed: Tue Nov 17 20:35:39 2015 -0800 ---------------------------------------------------------------------- .../apache/hive/common/util/BloomFilter.java | 18 +- .../org/apache/hive/common/util/Murmur3.java | 107 +- .../apache/hive/common/util/TestMurmur3.java | 45 +- .../hive/ql/io/orc/ColumnStatisticsImpl.java | 79 +- .../hadoop/hive/ql/io/orc/MemoryManager.java | 6 +- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 4 + .../hive/ql/io/orc/StringRedBlackTree.java | 5 + .../hadoop/hive/ql/io/orc/TypeDescription.java | 74 + .../apache/hadoop/hive/ql/io/orc/Writer.java | 7 + .../hadoop/hive/ql/io/orc/WriterImpl.java | 852 +++++- .../hive/ql/io/orc/TestColumnStatistics.java | 16 +- .../hive/ql/io/orc/TestMemoryManager.java | 2 +- .../hadoop/hive/ql/io/orc/TestOrcFile.java | 5 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 12 +- .../hive/ql/io/orc/TestVectorOrcFile.java | 2744 ++++++++++++++++++ .../hive/ql/exec/vector/BytesColumnVector.java | 13 + .../hive/ql/exec/vector/ColumnVector.java | 37 +- .../hive/ql/exec/vector/StructColumnVector.java | 8 + .../hive/ql/exec/vector/UnionColumnVector.java | 8 + .../hive/ql/exec/vector/VectorizedRowBatch.java | 10 + 20 files changed, 3917 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/java/org/apache/hive/common/util/BloomFilter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java index d894241..bb0b8f2 100644 --- a/common/src/java/org/apache/hive/common/util/BloomFilter.java +++ b/common/src/java/org/apache/hive/common/util/BloomFilter.java @@ -89,20 +89,21 @@ public class BloomFilter { public void add(byte[] val) { if (val == null) { - addBytes(val, -1); + addBytes(val, -1, -1); } else { - addBytes(val, val.length); + addBytes(val, 0, val.length); } } - public void addBytes(byte[] val, int length) { + public void addBytes(byte[] val, int offset, int length) { // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter" // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively // implement a Bloom filter without any loss in the asymptotic false positive probability' // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned // in the above paper - long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length); + long hash64 = val == null ? Murmur3.NULL_HASHCODE : + Murmur3.hash64(val, offset, length); addHash(hash64); } @@ -139,13 +140,14 @@ public class BloomFilter { public boolean test(byte[] val) { if (val == null) { - return testBytes(val, -1); + return testBytes(val, -1, -1); } - return testBytes(val, val.length); + return testBytes(val, 0, val.length); } - public boolean testBytes(byte[] val, int length) { - long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length); + public boolean testBytes(byte[] val, int offset, int length) { + long hash64 = val == null ? Murmur3.NULL_HASHCODE : + Murmur3.hash64(val, offset, length); return testHash(hash64); } http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/java/org/apache/hive/common/util/Murmur3.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hive/common/util/Murmur3.java b/common/src/java/org/apache/hive/common/util/Murmur3.java index 087407a..88c3514 100644 --- a/common/src/java/org/apache/hive/common/util/Murmur3.java +++ b/common/src/java/org/apache/hive/common/util/Murmur3.java @@ -128,11 +128,11 @@ public class Murmur3 { * @return - hashcode */ public static long hash64(byte[] data) { - return hash64(data, data.length, DEFAULT_SEED); + return hash64(data, 0, data.length, DEFAULT_SEED); } - public static long hash64(byte[] data, int length) { - return hash64(data, length, DEFAULT_SEED); + public static long hash64(byte[] data, int offset, int length) { + return hash64(data, offset, length, DEFAULT_SEED); } /** @@ -143,21 +143,21 @@ public class Murmur3 { * @param seed - seed. (default is 0) * @return - hashcode */ - public static long hash64(byte[] data, int length, int seed) { + public static long hash64(byte[] data, int offset, int length, int seed) { long hash = seed; final int nblocks = length >> 3; // body for (int i = 0; i < nblocks; i++) { final int i8 = i << 3; - long k = ((long) data[i8] & 0xff) - | (((long) data[i8 + 1] & 0xff) << 8) - | (((long) data[i8 + 2] & 0xff) << 16) - | (((long) data[i8 + 3] & 0xff) << 24) - | (((long) data[i8 + 4] & 0xff) << 32) - | (((long) data[i8 + 5] & 0xff) << 40) - | (((long) data[i8 + 6] & 0xff) << 48) - | (((long) data[i8 + 7] & 0xff) << 56); + long k = ((long) data[offset + i8] & 0xff) + | (((long) data[offset + i8 + 1] & 0xff) << 8) + | (((long) data[offset + i8 + 2] & 0xff) << 16) + | (((long) data[offset + i8 + 3] & 0xff) << 24) + | (((long) data[offset + i8 + 4] & 0xff) << 32) + | (((long) data[offset + i8 + 5] & 0xff) << 40) + | (((long) data[offset + i8 + 6] & 0xff) << 48) + | (((long) data[offset + i8 + 7] & 0xff) << 56); // mix functions k *= C1; @@ -172,19 +172,19 @@ public class Murmur3 { int tailStart = nblocks << 3; switch (length - tailStart) { case 7: - k1 ^= ((long) data[tailStart + 6] & 0xff) << 48; + k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48; case 6: - k1 ^= ((long) data[tailStart + 5] & 0xff) << 40; + k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40; case 5: - k1 ^= ((long) data[tailStart + 4] & 0xff) << 32; + k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32; case 4: - k1 ^= ((long) data[tailStart + 3] & 0xff) << 24; + k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24; case 3: - k1 ^= ((long) data[tailStart + 2] & 0xff) << 16; + k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16; case 2: - k1 ^= ((long) data[tailStart + 1] & 0xff) << 8; + k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8; case 1: - k1 ^= ((long) data[tailStart] & 0xff); + k1 ^= ((long) data[offset + tailStart] & 0xff); k1 *= C1; k1 = Long.rotateLeft(k1, R1); k1 *= C2; @@ -205,18 +205,19 @@ public class Murmur3 { * @return - hashcode (2 longs) */ public static long[] hash128(byte[] data) { - return hash128(data, data.length, DEFAULT_SEED); + return hash128(data, 0, data.length, DEFAULT_SEED); } /** * Murmur3 128-bit variant. * * @param data - input byte array + * @param offset - the first element of array * @param length - length of array * @param seed - seed. (default is 0) * @return - hashcode (2 longs) */ - public static long[] hash128(byte[] data, int length, int seed) { + public static long[] hash128(byte[] data, int offset, int length, int seed) { long h1 = seed; long h2 = seed; final int nblocks = length >> 4; @@ -224,23 +225,23 @@ public class Murmur3 { // body for (int i = 0; i < nblocks; i++) { final int i16 = i << 4; - long k1 = ((long) data[i16] & 0xff) - | (((long) data[i16 + 1] & 0xff) << 8) - | (((long) data[i16 + 2] & 0xff) << 16) - | (((long) data[i16 + 3] & 0xff) << 24) - | (((long) data[i16 + 4] & 0xff) << 32) - | (((long) data[i16 + 5] & 0xff) << 40) - | (((long) data[i16 + 6] & 0xff) << 48) - | (((long) data[i16 + 7] & 0xff) << 56); - - long k2 = ((long) data[i16 + 8] & 0xff) - | (((long) data[i16 + 9] & 0xff) << 8) - | (((long) data[i16 + 10] & 0xff) << 16) - | (((long) data[i16 + 11] & 0xff) << 24) - | (((long) data[i16 + 12] & 0xff) << 32) - | (((long) data[i16 + 13] & 0xff) << 40) - | (((long) data[i16 + 14] & 0xff) << 48) - | (((long) data[i16 + 15] & 0xff) << 56); + long k1 = ((long) data[offset + i16] & 0xff) + | (((long) data[offset + i16 + 1] & 0xff) << 8) + | (((long) data[offset + i16 + 2] & 0xff) << 16) + | (((long) data[offset + i16 + 3] & 0xff) << 24) + | (((long) data[offset + i16 + 4] & 0xff) << 32) + | (((long) data[offset + i16 + 5] & 0xff) << 40) + | (((long) data[offset + i16 + 6] & 0xff) << 48) + | (((long) data[offset + i16 + 7] & 0xff) << 56); + + long k2 = ((long) data[offset + i16 + 8] & 0xff) + | (((long) data[offset + i16 + 9] & 0xff) << 8) + | (((long) data[offset + i16 + 10] & 0xff) << 16) + | (((long) data[offset + i16 + 11] & 0xff) << 24) + | (((long) data[offset + i16 + 12] & 0xff) << 32) + | (((long) data[offset + i16 + 13] & 0xff) << 40) + | (((long) data[offset + i16 + 14] & 0xff) << 48) + | (((long) data[offset + i16 + 15] & 0xff) << 56); // mix functions for k1 k1 *= C1; @@ -267,40 +268,40 @@ public class Murmur3 { int tailStart = nblocks << 4; switch (length - tailStart) { case 15: - k2 ^= (long) (data[tailStart + 14] & 0xff) << 48; + k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48; case 14: - k2 ^= (long) (data[tailStart + 13] & 0xff) << 40; + k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40; case 13: - k2 ^= (long) (data[tailStart + 12] & 0xff) << 32; + k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32; case 12: - k2 ^= (long) (data[tailStart + 11] & 0xff) << 24; + k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24; case 11: - k2 ^= (long) (data[tailStart + 10] & 0xff) << 16; + k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16; case 10: - k2 ^= (long) (data[tailStart + 9] & 0xff) << 8; + k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8; case 9: - k2 ^= (long) (data[tailStart + 8] & 0xff); + k2 ^= (long) (data[offset + tailStart + 8] & 0xff); k2 *= C2; k2 = Long.rotateLeft(k2, R3); k2 *= C1; h2 ^= k2; case 8: - k1 ^= (long) (data[tailStart + 7] & 0xff) << 56; + k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56; case 7: - k1 ^= (long) (data[tailStart + 6] & 0xff) << 48; + k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48; case 6: - k1 ^= (long) (data[tailStart + 5] & 0xff) << 40; + k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40; case 5: - k1 ^= (long) (data[tailStart + 4] & 0xff) << 32; + k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32; case 4: - k1 ^= (long) (data[tailStart + 3] & 0xff) << 24; + k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24; case 3: - k1 ^= (long) (data[tailStart + 2] & 0xff) << 16; + k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16; case 2: - k1 ^= (long) (data[tailStart + 1] & 0xff) << 8; + k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8; case 1: - k1 ^= (long) (data[tailStart] & 0xff); + k1 ^= (long) (data[offset + tailStart] & 0xff); k1 *= C1; k1 = Long.rotateLeft(k1, R1); k1 *= C2; http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/common/src/test/org/apache/hive/common/util/TestMurmur3.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hive/common/util/TestMurmur3.java b/common/src/test/org/apache/hive/common/util/TestMurmur3.java index e506f71..5facc7c 100644 --- a/common/src/test/org/apache/hive/common/util/TestMurmur3.java +++ b/common/src/test/org/apache/hive/common/util/TestMurmur3.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Arrays; import java.util.Random; /** @@ -102,7 +103,7 @@ public class TestMurmur3 { buf.flip(); long gl1 = buf.getLong(); long gl2 = buf.getLong(8); - long[] hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed); + long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed); long m1 = hc[0]; long m2 = hc[1]; assertEquals(gl1, m1); @@ -114,11 +115,39 @@ public class TestMurmur3 { buf.flip(); gl1 = buf.getLong(); gl2 = buf.getLong(8); - hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed); + byte[] keyBytes = key.getBytes(); + hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed); m1 = hc[0]; m2 = hc[1]; assertEquals(gl1, m1); assertEquals(gl2, m2); + + byte[] offsetKeyBytes = new byte[keyBytes.length + 35]; + Arrays.fill(offsetKeyBytes, (byte) -1); + System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length); + hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed); + assertEquals(gl1, hc[0]); + assertEquals(gl2, hc[1]); + } + + @Test + public void testHashCodeM3_64() { + byte[] origin = ("It was the best of times, it was the worst of times," + + " it was the age of wisdom, it was the age of foolishness," + + " it was the epoch of belief, it was the epoch of incredulity," + + " it was the season of Light, it was the season of Darkness," + + " it was the spring of hope, it was the winter of despair," + + " we had everything before us, we had nothing before us," + + " we were all going direct to Heaven," + + " we were all going direct the other way.").getBytes(); + long hash = Murmur3.hash64(origin, 0, origin.length); + assertEquals(305830725663368540L, hash); + + byte[] originOffset = new byte[origin.length + 150]; + Arrays.fill(originOffset, (byte) 123); + System.arraycopy(origin, 0, originOffset, 150, origin.length); + hash = Murmur3.hash64(originOffset, 150, origin.length); + assertEquals(305830725663368540L, hash); } @Test @@ -135,11 +164,17 @@ public class TestMurmur3 { buf.flip(); long gl1 = buf.getLong(); long gl2 = buf.getLong(8); - long[] hc = Murmur3.hash128(data, data.length, seed); + long[] hc = Murmur3.hash128(data, 0, data.length, seed); long m1 = hc[0]; long m2 = hc[1]; assertEquals(gl1, m1); assertEquals(gl2, m2); + + byte[] offsetData = new byte[data.length + 50]; + System.arraycopy(data, 0, offsetData, 50, data.length); + hc = Murmur3.hash128(offsetData, 50, data.length, seed); + assertEquals(gl1, hc[0]); + assertEquals(gl2, hc[1]); } } @@ -157,7 +192,7 @@ public class TestMurmur3 { buf.flip(); long gl1 = buf.getLong(); long gl2 = buf.getLong(8); - long[] hc = Murmur3.hash128(data, data.length, seed); + long[] hc = Murmur3.hash128(data, 0, data.length, seed); long m1 = hc[0]; long m2 = hc[1]; assertEquals(gl1, m1); @@ -179,7 +214,7 @@ public class TestMurmur3 { buf.flip(); long gl1 = buf.getLong(); long gl2 = buf.getLong(8); - long[] hc = Murmur3.hash128(data, data.length, seed); + long[] hc = Murmur3.hash128(data, 0, data.length, seed); long m1 = hc[0]; long m2 = hc[1]; assertEquals(gl1, m1); http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index f39d3e2..bcca9de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; class ColumnStatisticsImpl implements ColumnStatistics { @@ -47,9 +48,9 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override - void updateBoolean(boolean value) { + void updateBoolean(boolean value, int repetitions) { if (value) { - trueCount += 1; + trueCount += repetitions; } } @@ -132,7 +133,7 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override - void updateInteger(long value) { + void updateInteger(long value, int repetitions) { if (!hasMinimum) { hasMinimum = true; minimum = value; @@ -144,7 +145,7 @@ class ColumnStatisticsImpl implements ColumnStatistics { } if (!overflow) { boolean wasPositive = sum >= 0; - sum += value; + sum += value * repetitions; if ((value >= 0) == wasPositive) { overflow = (sum >= 0) != wasPositive; } @@ -398,6 +399,23 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override + void updateString(byte[] bytes, int offset, int length, int repetitions) { + if (minimum == null) { + maximum = minimum = new Text(); + maximum.set(bytes, offset, length); + } else if (WritableComparator.compareBytes(minimum.getBytes(), 0, + minimum.getLength(), bytes, offset, length) > 0) { + minimum = new Text(); + minimum.set(bytes, offset, length); + } else if (WritableComparator.compareBytes(maximum.getBytes(), 0, + maximum.getLength(), bytes, offset, length) < 0) { + maximum = new Text(); + maximum.set(bytes, offset, length); + } + sum += length * repetitions; + } + + @Override void merge(ColumnStatisticsImpl other) { if (other instanceof StringStatisticsImpl) { StringStatisticsImpl str = (StringStatisticsImpl) other; @@ -498,6 +516,11 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override + void updateBinary(byte[] bytes, int offset, int length, int repetitions) { + sum += length * repetitions; + } + + @Override void merge(ColumnStatisticsImpl other) { if (other instanceof BinaryColumnStatistics) { BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other; @@ -700,6 +723,18 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override + void updateDate(int value) { + if (minimum == null) { + minimum = value; + maximum = value; + } else if (minimum > value) { + minimum = value; + } else if (maximum < value) { + maximum = value; + } + } + + @Override void merge(ColumnStatisticsImpl other) { if (other instanceof DateStatisticsImpl) { DateStatisticsImpl dateStats = (DateStatisticsImpl) other; @@ -809,6 +844,18 @@ class ColumnStatisticsImpl implements ColumnStatistics { } @Override + void updateTimestamp(long value) { + if (minimum == null) { + minimum = value; + maximum = value; + } else if (minimum > value) { + minimum = value; + } else if (maximum < value) { + maximum = value; + } + } + + @Override void merge(ColumnStatisticsImpl other) { if (other instanceof TimestampStatisticsImpl) { TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other; @@ -889,15 +936,19 @@ class ColumnStatisticsImpl implements ColumnStatistics { count += 1; } + void increment(int count) { + this.count += count; + } + void setNull() { hasNull = true; } - void updateBoolean(boolean value) { + void updateBoolean(boolean value, int repetitions) { throw new UnsupportedOperationException("Can't update boolean"); } - void updateInteger(long value) { + void updateInteger(long value, int repetitions) { throw new UnsupportedOperationException("Can't update integer"); } @@ -909,10 +960,18 @@ class ColumnStatisticsImpl implements ColumnStatistics { throw new UnsupportedOperationException("Can't update string"); } + void updateString(byte[] bytes, int offset, int length, int repetitions) { + throw new UnsupportedOperationException("Can't update string"); + } + void updateBinary(BytesWritable value) { throw new UnsupportedOperationException("Can't update binary"); } + void updateBinary(byte[] bytes, int offset, int length, int repetitions) { + throw new UnsupportedOperationException("Can't update string"); + } + void updateDecimal(HiveDecimal value) { throw new UnsupportedOperationException("Can't update decimal"); } @@ -921,10 +980,18 @@ class ColumnStatisticsImpl implements ColumnStatistics { throw new UnsupportedOperationException("Can't update date"); } + void updateDate(int value) { + throw new UnsupportedOperationException("Can't update date"); + } + void updateTimestamp(Timestamp value) { throw new UnsupportedOperationException("Can't update timestamp"); } + void updateTimestamp(long value) { + throw new UnsupportedOperationException("Can't update timestamp"); + } + boolean isStatsExists() { return (count > 0 || hasNull == true); } http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java index 4d5f735..bb35b13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java @@ -172,10 +172,12 @@ class MemoryManager { /** * Give the memory manager an opportunity for doing a memory check. + * @param rows number of rows added * @throws IOException */ - void addedRow() throws IOException { - if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { + void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { notifyWriters(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 04b9eaf..84d627a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -1169,6 +1169,10 @@ public class RecordReaderImpl implements RecordReader { return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; } + MetadataReader getMetadataReader() { + return metadata; + } + private int findStripe(long rowNumber) { for (int i = 0; i < stripes.size(); i++) { StripeInformation stripe = stripes.get(i); http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java index 6094175..e0c52e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java @@ -55,6 +55,11 @@ class StringRedBlackTree extends RedBlackTree { return addNewKey(); } + public int add(byte[] bytes, int offset, int length) { + newKey.set(bytes, offset, length); + return addNewKey(); + } + @Override protected int compareValue(int position) { int start = keyOffsets.get(position); http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java index 3481bb3..b365408 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java @@ -18,6 +18,17 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -264,6 +275,69 @@ public class TypeDescription { return maxId; } + private ColumnVector createColumn() { + switch (category) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case DATE: + return new LongColumnVector(); + case FLOAT: + case DOUBLE: + return new DoubleColumnVector(); + case DECIMAL: + return new DecimalColumnVector(precision, scale); + case STRING: + case BINARY: + case CHAR: + case VARCHAR: + return new BytesColumnVector(); + case STRUCT: { + ColumnVector[] fieldVector = new ColumnVector[children.size()]; + for(int i=0; i < fieldVector.length; ++i) { + fieldVector[i] = children.get(i).createColumn(); + } + return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + fieldVector); + } + case UNION: { + ColumnVector[] fieldVector = new ColumnVector[children.size()]; + for(int i=0; i < fieldVector.length; ++i) { + fieldVector[i] = children.get(i).createColumn(); + } + return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + fieldVector); + } + case LIST: + return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + children.get(0).createColumn()); + case MAP: + return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + children.get(0).createColumn(), children.get(1).createColumn()); + default: + throw new IllegalArgumentException("Unknown type " + category); + } + } + + public VectorizedRowBatch createRowBatch() { + VectorizedRowBatch result; + if (category == Category.STRUCT) { + result = new VectorizedRowBatch(children.size(), + VectorizedRowBatch.DEFAULT_SIZE); + for(int i=0; i < result.cols.length; ++i) { + result.cols[i] = children.get(i).createColumn(); + } + } else { + result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE); + result.cols[0] = createColumn(); + } + result.reset(); + return result; + } + /** * Get the kind of this type. * @return get the category for this type. http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java index 8991f2d..1873ed1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import java.io.IOException; @@ -52,6 +53,12 @@ public interface Writer { void addRow(Object row) throws IOException; /** + * Add a row batch to the ORC file. + * @param batch the rows to add + */ + void addRowBatch(VectorizedRowBatch batch) throws IOException; + + /** * Flush all of the buffers and close the file. No methods on this writer * should be called afterwards. * @throws IOException http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 5a82d20..c3916d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -26,6 +26,7 @@ import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -40,6 +41,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy; @@ -582,7 +593,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private abstract static class TreeWriter { protected final int id; protected final ObjectInspector inspector; - private final BitFieldWriter isPresent; + protected final BitFieldWriter isPresent; private final boolean isCompressed; protected final ColumnStatisticsImpl indexStatistics; protected final ColumnStatisticsImpl stripeColStatistics; @@ -708,6 +719,73 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } + /** + * Handle the top level object write. + * + * This default method is used for all types except structs, which are the + * typical case. VectorizedRowBatch assumes the top level object is a + * struct, so we use the first column for all other types. + * @param batch the batch to write from + * @param offset the row to start on + * @param length the number of rows to write + * @throws IOException + */ + void writeRootBatch(VectorizedRowBatch batch, int offset, + int length) throws IOException { + writeBatch(batch.cols[0], offset, length); + } + + /** + * Write the values from the given vector from offset for length elements. + * @param vector the vector to write from + * @param offset the first value from the vector to write + * @param length the number of values from the vector to write + * @throws IOException + */ + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + if (vector.noNulls) { + indexStatistics.increment(length); + if (isPresent != null) { + for (int i = 0; i < length; ++i) { + isPresent.write(1); + } + } + } else { + if (vector.isRepeating) { + boolean isNull = vector.isNull[0]; + if (isPresent != null) { + for (int i = 0; i < length; ++i) { + isPresent.write(isNull ? 0 : 1); + } + } + if (isNull) { + foundNulls = true; + indexStatistics.setNull(); + } else { + indexStatistics.increment(length); + } + } else { + // count the number of non-null values + int nonNullCount = 0; + for(int i = 0; i < length; ++i) { + boolean isNull = vector.isNull[i + offset]; + if (!isNull) { + nonNullCount += 1; + } + if (isPresent != null) { + isPresent.write(isNull ? 0 : 1); + } + } + indexStatistics.increment(nonNullCount); + if (nonNullCount != length) { + foundNulls = true; + indexStatistics.setNull(); + } + } + } + } + private void removeIsPresentPositions() { for(int i=0; i < rowIndex.getEntryCount(); ++i) { RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i); @@ -876,12 +954,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { super.write(obj); if (obj != null) { boolean val = ((BooleanObjectInspector) inspector).get(obj); - indexStatistics.updateBoolean(val); + indexStatistics.updateBoolean(val, 1); writer.write(val ? 1 : 0); } } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int value = vec.vector[0] == 0 ? 0 : 1; + indexStatistics.updateBoolean(value != 0, length); + for(int i=0; i < length; ++i) { + writer.write(value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + int value = vec.vector[i + offset] == 0 ? 0 : 1; + writer.write(value); + indexStatistics.updateBoolean(value != 0, 1); + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -915,7 +1017,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { super.write(obj); if (obj != null) { byte val = ((ByteObjectInspector) inspector).get(obj); - indexStatistics.updateInteger(val); + indexStatistics.updateInteger(val, 1); if (createBloomFilter) { bloomFilter.addLong(val); } @@ -924,6 +1026,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + byte value = (byte) vec.vector[0]; + indexStatistics.updateInteger(value, length); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + for(int i=0; i < length; ++i) { + writer.write(value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + byte value = (byte) vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateInteger(value, 1); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -994,7 +1126,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } else { val = shortInspector.get(obj); } - indexStatistics.updateInteger(val); + indexStatistics.updateInteger(val, 1); if (createBloomFilter) { // integers are converted to longs in column statistics and during SARG evaluation bloomFilter.addLong(val); @@ -1004,6 +1136,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + long value = vec.vector[0]; + indexStatistics.updateInteger(value, length); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + for(int i=0; i < length; ++i) { + writer.write(value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + long value = vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateInteger(value, 1); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1049,6 +1211,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DoubleColumnVector vec = (DoubleColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + float value = (float) vec.vector[0]; + indexStatistics.updateDouble(value); + if (createBloomFilter) { + bloomFilter.addDouble(value); + } + for(int i=0; i < length; ++i) { + utils.writeFloat(stream, value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + float value = (float) vec.vector[i + offset]; + utils.writeFloat(stream, value); + indexStatistics.updateDouble(value); + if (createBloomFilter) { + bloomFilter.addDouble(value); + } + } + } + } + } + + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1093,6 +1286,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DoubleColumnVector vec = (DoubleColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + double value = vec.vector[0]; + indexStatistics.updateDouble(value); + if (createBloomFilter) { + bloomFilter.addDouble(value); + } + for(int i=0; i < length; ++i) { + utils.writeDouble(stream, value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + double value = vec.vector[i + offset]; + utils.writeDouble(stream, value); + indexStatistics.updateDouble(value); + if (createBloomFilter) { + bloomFilter.addDouble(value); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1107,16 +1330,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - private static class StringTreeWriter extends TreeWriter { + private static abstract class StringBaseTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final OutStream stringOutput; private final IntegerWriter lengthOutput; private final IntegerWriter rowOutput; - private final StringRedBlackTree dictionary = + protected final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); - private final DynamicIntArray rows = new DynamicIntArray(); - private final PositionedOutputStream directStreamOutput; - private final IntegerWriter directLengthOutput; + protected final DynamicIntArray rows = new DynamicIntArray(); + protected final PositionedOutputStream directStreamOutput; + protected final IntegerWriter directLengthOutput; private final List<OrcProto.RowIndexEntry> savedRowIndex = new ArrayList<OrcProto.RowIndexEntry>(); private final boolean buildIndex; @@ -1124,12 +1347,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // If the number of keys in a dictionary is greater than this fraction of //the total number of non-null rows, turn off dictionary encoding private final double dictionaryKeySizeThreshold; - private boolean useDictionaryEncoding = true; + protected boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; private boolean doneDictionaryCheck; private final boolean strideDictionaryCheck; - StringTreeWriter(int columnId, + StringBaseTreeWriter(int columnId, ObjectInspector inspector, TypeDescription schema, StreamFactory writer, @@ -1171,7 +1394,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { super.write(obj); if (obj != null) { Text val = getTextValue(obj); - if (useDictionaryEncoding || !strideDictionaryCheck) { + if (useDictionaryEncoding) { rows.add(dictionary.add(val)); } else { // write data and length @@ -1180,7 +1403,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } indexStatistics.updateString(val); if (createBloomFilter) { - bloomFilter.addBytes(val.getBytes(), val.getLength()); + bloomFilter.addBytes(val.getBytes(), 0, val.getLength()); } } } @@ -1364,10 +1587,69 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } + private static class StringTreeWriter extends StringBaseTreeWriter { + StringTreeWriter(int columnId, + ObjectInspector inspector, + TypeDescription schema, + StreamFactory writer, + boolean nullable) throws IOException { + super(columnId, inspector, schema, writer, nullable); + } + + @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + if (useDictionaryEncoding) { + int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]); + for(int i=0; i < length; ++i) { + rows.add(id); + } + } else { + for(int i=0; i < length; ++i) { + directStreamOutput.write(vec.vector[0], vec.start[0], + vec.length[0]); + directLengthOutput.write(vec.length[0]); + } + } + indexStatistics.updateString(vec.vector[0], vec.start[0], + vec.length[0], length); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + if (useDictionaryEncoding) { + rows.add(dictionary.add(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i])); + } else { + directStreamOutput.write(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + directLengthOutput.write(vec.length[offset + i]); + } + indexStatistics.updateString(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } + } + } + } + } + /** * Under the covers, char is written to ORC the same way as string. */ - private static class CharTreeWriter extends StringTreeWriter { + private static class CharTreeWriter extends StringBaseTreeWriter { + private final int itemLength; + private final byte[] padding; CharTreeWriter(int columnId, ObjectInspector inspector, @@ -1375,6 +1657,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); + itemLength = schema.getMaxLength(); + padding = new byte[itemLength]; } /** @@ -1385,12 +1669,79 @@ public class WriterImpl implements Writer, MemoryManager.Callback { return (((HiveCharObjectInspector) inspector) .getPrimitiveWritableObject(obj)).getTextValue(); } + + @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + byte[] ptr; + int ptrOffset; + if (vec.length[0] >= itemLength) { + ptr = vec.vector[0]; + ptrOffset = vec.start[0]; + } else { + ptr = padding; + ptrOffset = 0; + System.arraycopy(vec.vector[0], vec.start[0], ptr, 0, + vec.length[0]); + Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' '); + } + if (useDictionaryEncoding) { + int id = dictionary.add(ptr, ptrOffset, itemLength); + for(int i=0; i < length; ++i) { + rows.add(id); + } + } else { + for(int i=0; i < length; ++i) { + directStreamOutput.write(ptr, ptrOffset, itemLength); + directLengthOutput.write(itemLength); + } + } + indexStatistics.updateString(ptr, ptrOffset, itemLength, length); + if (createBloomFilter) { + bloomFilter.addBytes(ptr, ptrOffset, itemLength); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + byte[] ptr; + int ptrOffset; + if (vec.length[offset + i] >= itemLength) { + ptr = vec.vector[offset + i]; + ptrOffset = vec.start[offset + i]; + } else { + // it is the wrong length, so copy it + ptr = padding; + ptrOffset = 0; + System.arraycopy(vec.vector[offset + i], vec.start[offset + i], + ptr, 0, vec.length[offset + i]); + Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' '); + } + if (useDictionaryEncoding) { + rows.add(dictionary.add(ptr, ptrOffset, itemLength)); + } else { + directStreamOutput.write(ptr, ptrOffset, itemLength); + directLengthOutput.write(itemLength); + } + indexStatistics.updateString(ptr, ptrOffset, itemLength, 1); + if (createBloomFilter) { + bloomFilter.addBytes(ptr, ptrOffset, itemLength); + } + } + } + } + } } /** * Under the covers, varchar is written to ORC the same way as string. */ - private static class VarcharTreeWriter extends StringTreeWriter { + private static class VarcharTreeWriter extends StringBaseTreeWriter { + private final int maxLength; VarcharTreeWriter(int columnId, ObjectInspector inspector, @@ -1398,6 +1749,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); + maxLength = schema.getMaxLength(); } /** @@ -1408,6 +1760,55 @@ public class WriterImpl implements Writer, MemoryManager.Callback { return (((HiveVarcharObjectInspector) inspector) .getPrimitiveWritableObject(obj)).getTextValue(); } + + @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int itemLength = Math.min(vec.length[0], maxLength); + if (useDictionaryEncoding) { + int id = dictionary.add(vec.vector[0], vec.start[0], itemLength); + for(int i=0; i < length; ++i) { + rows.add(id); + } + } else { + for(int i=0; i < length; ++i) { + directStreamOutput.write(vec.vector[0], vec.start[0], + itemLength); + directLengthOutput.write(itemLength); + } + } + indexStatistics.updateString(vec.vector[0], vec.start[0], + itemLength, length); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + int itemLength = Math.min(vec.length[offset + i], maxLength); + if (useDictionaryEncoding) { + rows.add(dictionary.add(vec.vector[offset + i], + vec.start[offset + i], itemLength)); + } else { + directStreamOutput.write(vec.vector[offset + i], + vec.start[offset + i], itemLength); + directLengthOutput.write(itemLength); + } + indexStatistics.updateString(vec.vector[offset + i], + vec.start[offset + i], itemLength, 1); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], itemLength); + } + } + } + } + } } private static class BinaryTreeWriter extends TreeWriter { @@ -1449,12 +1850,48 @@ public class WriterImpl implements Writer, MemoryManager.Callback { length.write(val.getLength()); indexStatistics.updateBinary(val); if (createBloomFilter) { - bloomFilter.addBytes(val.getBytes(), val.getLength()); + bloomFilter.addBytes(val.getBytes(), 0, val.getLength()); } } } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + BytesColumnVector vec = (BytesColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + for(int i=0; i < length; ++i) { + stream.write(vec.vector[0], vec.start[0], + vec.length[0]); + this.length.write(vec.length[0]); + } + indexStatistics.updateBinary(vec.vector[0], vec.start[0], + vec.length[0], length); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + stream.write(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + this.length.write(vec.length[offset + i]); + indexStatistics.updateBinary(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i], 1); + if (createBloomFilter) { + bloomFilter.addBytes(vec.vector[offset + i], + vec.start[offset + i], vec.length[offset + i]); + } + } + } + } + } + + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1472,6 +1909,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } static final int MILLIS_PER_SECOND = 1000; + static final int NANOS_PER_SECOND = 1000000000; + static final int MILLIS_PER_NANO = 1000000; static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; private static class TimestampTreeWriter extends TreeWriter { @@ -1524,6 +1963,47 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + long value = vec.vector[0]; + long valueMillis = value / MILLIS_PER_NANO; + indexStatistics.updateTimestamp(valueMillis); + if (createBloomFilter) { + bloomFilter.addLong(valueMillis); + } + final long secs = value / NANOS_PER_SECOND - base_timestamp; + final long nano = formatNanos((int) (value % NANOS_PER_SECOND)); + for(int i=0; i < length; ++i) { + seconds.write(secs); + nanos.write(nano); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + long value = vec.vector[i + offset]; + long valueMillis = value / MILLIS_PER_NANO; + long valueSecs = value /NANOS_PER_SECOND - base_timestamp; + int valueNanos = (int) (value % NANOS_PER_SECOND); + if (valueNanos < 0) { + valueNanos += NANOS_PER_SECOND; + } + seconds.write(valueSecs); + nanos.write(formatNanos(valueNanos)); + indexStatistics.updateTimestamp(valueMillis); + if (createBloomFilter) { + bloomFilter.addLong(valueMillis); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1588,6 +2068,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + LongColumnVector vec = (LongColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int value = (int) vec.vector[0]; + indexStatistics.updateDate(value); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + for(int i=0; i < length; ++i) { + writer.write(value); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + int value = (int) vec.vector[i + offset]; + writer.write(value); + indexStatistics.updateDate(value); + if (createBloomFilter) { + bloomFilter.addLong(value); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1660,6 +2170,40 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + DecimalColumnVector vec = (DecimalColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + HiveDecimal value = vec.vector[0].getHiveDecimal(); + indexStatistics.updateDecimal(value); + if (createBloomFilter) { + bloomFilter.addString(value.toString()); + } + for(int i=0; i < length; ++i) { + SerializationUtils.writeBigInteger(valueStream, + value.unscaledValue()); + scaleStream.write(value.scale()); + } + } + } else { + for(int i=0; i < length; ++i) { + if (vec.noNulls || !vec.isNull[i + offset]) { + HiveDecimal value = vec.vector[i + offset].getHiveDecimal(); + SerializationUtils.writeBigInteger(valueStream, + value.unscaledValue()); + scaleStream.write(value.scale()); + indexStatistics.updateDecimal(value); + if (createBloomFilter) { + bloomFilter.addString(value.toString()); + } + } + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1685,13 +2229,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback { boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); List<TypeDescription> children = schema.getChildren(); - StructObjectInspector structObjectInspector = - (StructObjectInspector) inspector; - fields = structObjectInspector.getAllStructFieldRefs(); + if (inspector != null) { + StructObjectInspector structObjectInspector = + (StructObjectInspector) inspector; + fields = structObjectInspector.getAllStructFieldRefs(); + } else { + fields = null; + } childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { - ObjectInspector childOI = i < fields.size() ? - fields.get(i).getFieldObjectInspector() : null; + ObjectInspector childOI; + if (fields != null && i < fields.size()) { + childOI = fields.get(i).getFieldObjectInspector(); + } else { + childOI = null; + } childrenWriters[i] = createTreeWriter( childOI, children.get(i), writer, true); @@ -1713,6 +2265,60 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeRootBatch(VectorizedRowBatch batch, int offset, + int length) throws IOException { + // update the statistics for the root column + indexStatistics.increment(length); + // I'm assuming that the root column isn't nullable so that I don't need + // to update isPresent. + for(int i=0; i < childrenWriters.length; ++i) { + childrenWriters[i].writeBatch(batch.cols[i], offset, length); + } + } + + private static void writeFields(StructColumnVector vector, + TreeWriter[] childrenWriters, + int offset, int length) throws IOException { + for(int field=0; field < childrenWriters.length; ++field) { + childrenWriters[field].writeBatch(vector.fields[field], offset, length); + } + } + + @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + StructColumnVector vec = (StructColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + writeFields(vec, childrenWriters, offset, length); + } + } else if (vector.noNulls) { + writeFields(vec, childrenWriters, offset, length); + } else { + // write the records in runs + int currentRun = 0; + boolean started = false; + for(int i=0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + if (!started) { + started = true; + currentRun = i; + } + } else if (started) { + started = false; + writeFields(vec, childrenWriters, offset + currentRun, + i - currentRun); + } + } + if (started) { + writeFields(vec, childrenWriters, offset + currentRun, + length - currentRun); + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1734,8 +2340,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); - ObjectInspector childOI = - ((ListObjectInspector) inspector).getListElementObjectInspector(); + ObjectInspector childOI = null; + if (inspector != null) { + childOI = + ((ListObjectInspector) inspector).getListElementObjectInspector(); + } childrenWriters = new TreeWriter[1]; childrenWriters[0] = createTreeWriter(childOI, schema.getChildren().get(0), writer, true); @@ -1771,6 +2380,52 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + ListColumnVector vec = (ListColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int childOffset = (int) vec.offsets[0]; + int childLength = (int) vec.lengths[0]; + for(int i=0; i < length; ++i) { + lengths.write(childLength); + childrenWriters[0].writeBatch(vec.child, childOffset, childLength); + } + if (createBloomFilter) { + bloomFilter.addLong(childLength); + } + } + } else { + // write the elements in runs + int currentOffset = 0; + int currentLength = 0; + for(int i=0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + int nextLength = (int) vec.lengths[offset + i]; + int nextOffset = (int) vec.offsets[offset + i]; + lengths.write(nextLength); + if (currentLength == 0) { + currentOffset = nextOffset; + currentLength = nextLength; + } else if (currentOffset + currentLength != nextOffset) { + childrenWriters[0].writeBatch(vec.child, currentOffset, + currentLength); + currentOffset = nextOffset; + currentLength = nextLength; + } else { + currentLength += nextLength; + } + } + } + if (currentLength != 0) { + childrenWriters[0].writeBatch(vec.child, currentOffset, + currentLength); + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1799,15 +2454,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback { boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); - MapObjectInspector insp = (MapObjectInspector) inspector; childrenWriters = new TreeWriter[2]; List<TypeDescription> children = schema.getChildren(); + ObjectInspector keyInsp = null; + ObjectInspector valueInsp = null; + if (inspector != null) { + MapObjectInspector insp = (MapObjectInspector) inspector; + keyInsp = insp.getMapKeyObjectInspector(); + valueInsp = insp.getMapValueObjectInspector(); + } childrenWriters[0] = - createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0), - writer, true); + createTreeWriter(keyInsp, children.get(0), writer, true); childrenWriters[1] = - createTreeWriter(insp.getMapValueObjectInspector(), children.get(1), - writer, true); + createTreeWriter(valueInsp, children.get(1), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1843,6 +2502,57 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + MapColumnVector vec = (MapColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + int childOffset = (int) vec.offsets[0]; + int childLength = (int) vec.lengths[0]; + for(int i=0; i < length; ++i) { + lengths.write(childLength); + childrenWriters[0].writeBatch(vec.keys, childOffset, childLength); + childrenWriters[1].writeBatch(vec.values, childOffset, childLength); + } + if (createBloomFilter) { + bloomFilter.addLong(childLength); + } + } + } else { + // write the elements in runs + int currentOffset = 0; + int currentLength = 0; + for(int i=0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + int nextLength = (int) vec.lengths[offset + i]; + int nextOffset = (int) vec.offsets[offset + i]; + lengths.write(nextLength); + if (currentLength == 0) { + currentOffset = nextOffset; + currentLength = nextLength; + } else if (currentOffset + currentLength != nextOffset) { + childrenWriters[0].writeBatch(vec.keys, currentOffset, + currentLength); + childrenWriters[1].writeBatch(vec.values, currentOffset, + currentLength); + currentOffset = nextOffset; + currentLength = nextLength; + } else { + currentLength += nextLength; + } + } + } + if (currentLength != 0) { + childrenWriters[0].writeBatch(vec.keys, currentOffset, + currentLength); + childrenWriters[1].writeBatch(vec.values, currentOffset, + currentLength); + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -1869,13 +2579,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback { StreamFactory writer, boolean nullable) throws IOException { super(columnId, inspector, schema, writer, nullable); - UnionObjectInspector insp = (UnionObjectInspector) inspector; - List<ObjectInspector> choices = insp.getObjectInspectors(); + List<ObjectInspector> choices = null; + if (inspector != null) { + UnionObjectInspector insp = (UnionObjectInspector) inspector; + choices = insp.getObjectInspectors(); + } List<TypeDescription> children = schema.getChildren(); childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { - childrenWriters[i] = createTreeWriter(choices.get(i), - children.get(i), writer, true); + childrenWriters[i] = + createTreeWriter(choices != null ? choices.get(i) : null, + children.get(i), writer, true); } tags = new RunLengthByteWriter(writer.createStream(columnId, @@ -1898,6 +2612,54 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } @Override + void writeBatch(ColumnVector vector, int offset, + int length) throws IOException { + super.writeBatch(vector, offset, length); + UnionColumnVector vec = (UnionColumnVector) vector; + if (vector.isRepeating) { + if (vector.noNulls || !vector.isNull[0]) { + byte tag = (byte) vec.tags[0]; + for(int i=0; i < length; ++i) { + tags.write(tag); + } + if (createBloomFilter) { + bloomFilter.addLong(tag); + } + childrenWriters[tag].writeBatch(vec.fields[tag], offset, length); + } + } else { + // write the records in runs of the same tag + byte prevTag = 0; + int currentRun = 0; + boolean started = false; + for(int i=0; i < length; ++i) { + if (!vec.isNull[i + offset]) { + byte tag = (byte) vec.tags[offset + i]; + tags.write(tag); + if (!started) { + started = true; + currentRun = i; + prevTag = tag; + } else if (tag != prevTag) { + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, i - currentRun); + currentRun = i; + prevTag = tag; + } + } else if (started) { + started = false; + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, i - currentRun); + } + } + if (started) { + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, length - currentRun); + } + } + } + + @Override void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); @@ -2365,7 +3127,31 @@ public class WriterImpl implements Writer, MemoryManager.Callback { createRowIndexEntry(); } } - memoryManager.addedRow(); + memoryManager.addedRow(1); + } + + @Override + public void addRowBatch(VectorizedRowBatch batch) throws IOException { + if (buildIndex) { + // Batch the writes up to the rowIndexStride so that we can get the + // right size indexes. + int posn = 0; + while (posn < batch.size) { + int chunkSize = Math.min(batch.size - posn, + rowIndexStride - rowsInIndex); + treeWriter.writeRootBatch(batch, posn, chunkSize); + posn += chunkSize; + rowsInIndex += chunkSize; + rowsInStripe += chunkSize; + if (rowsInIndex >= rowIndexStride) { + createRowIndexEntry(); + } + } + } else { + rowsInStripe += batch.size; + treeWriter.writeRootBatch(batch, 0, batch.size); + } + memoryManager.addedRow(batch.size); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java index f6111e8..a51177e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java @@ -52,17 +52,16 @@ public class TestColumnStatistics { ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); - stats1.updateInteger(10); - stats1.updateInteger(10); - stats2.updateInteger(1); - stats2.updateInteger(1000); + stats1.updateInteger(10, 2); + stats2.updateInteger(1, 1); + stats2.updateInteger(1000, 1); stats1.merge(stats2); IntegerColumnStatistics typed = (IntegerColumnStatistics) stats1; assertEquals(1, typed.getMinimum()); assertEquals(1000, typed.getMaximum()); stats1.reset(); - stats1.updateInteger(-10); - stats1.updateInteger(10000); + stats1.updateInteger(-10, 1); + stats1.updateInteger(10000, 1); stats1.merge(stats2); assertEquals(-10, typed.getMinimum()); assertEquals(10000, typed.getMaximum()); @@ -101,11 +100,14 @@ public class TestColumnStatistics { stats1.updateString(new Text("david")); stats1.updateString(new Text("charles")); stats2.updateString(new Text("anne")); - stats2.updateString(new Text("erin")); + byte[] erin = new byte[]{0, 1, 2, 3, 4, 5, 101, 114, 105, 110}; + stats2.updateString(erin, 6, 4, 5); + assertEquals(24, ((StringColumnStatistics)stats2).getSum()); stats1.merge(stats2); StringColumnStatistics typed = (StringColumnStatistics) stats1; assertEquals("anne", typed.getMinimum()); assertEquals("erin", typed.getMaximum()); + assertEquals(39, typed.getSum()); stats1.reset(); stats1.updateString(new Text("aaa")); stats1.updateString(new Text("zzz")); http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java index fb6be16..19aaff3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java @@ -122,7 +122,7 @@ public class TestMemoryManager { } // add enough rows to get the memory manager to check the limits for(int i=0; i < 10000; ++i) { - mgr.addedRow(); + mgr.addedRow(1); } for(int call=0; call < calls.length; ++call) { verify(calls[call], times(2)) http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index e78f7aa..146f5b1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -1818,8 +1818,9 @@ public class TestOrcFile { } @Override - void addedRow() throws IOException { - if (++rows % 100 == 0) { + void addedRow(int count) throws IOException { + rows += count; + if (rows % 100 == 0) { callback.checkMemory(rate); } } http://git-wip-us.apache.org/repos/asf/hive/blob/7f65e36d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 797bbfb..15ee24c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -815,8 +815,10 @@ public class TestOrcRawRecordMerger { MemoryManager mgr = new MemoryManager(conf){ int rowsAddedSinceCheck = 0; - synchronized void addedRow() throws IOException { - if (++rowsAddedSinceCheck >= 2) { + @Override + synchronized void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= 2) { notifyWriters(); rowsAddedSinceCheck = 0; } @@ -912,8 +914,10 @@ public class TestOrcRawRecordMerger { MemoryManager mgr = new MemoryManager(conf){ int rowsAddedSinceCheck = 0; - synchronized void addedRow() throws IOException { - if (++rowsAddedSinceCheck >= 2) { + @Override + synchronized void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= 2) { notifyWriters(); rowsAddedSinceCheck = 0; }
