This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea614d HIVE-21391: LLAP Pool of column vector buffers can cause
memory pressure this fix make memroy estimation more tight (Slim Bouguerra
reviewed by Prasanth Jayachandran)
5ea614d is described below
commit 5ea614d799b82237e2387b3bcb7b63e7db90a3c9
Author: Slim Bouguerra <[email protected]>
AuthorDate: Fri Jul 19 15:25:26 2019 -0700
HIVE-21391: LLAP Pool of column vector buffers can cause memory pressure
this fix make memroy estimation more tight (Slim Bouguerra reviewed by Prasanth
Jayachandran)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 11 +-
.../hive/common/util/FixedSizedObjectPool.java | 7 ++
.../hive/common/util/TestFixedSizedObjectPool.java | 24 ++++
.../java/org/apache/hadoop/hive/llap/LlapUtil.java | 4 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 126 +++++++++++++++------
.../hive/llap/io/decode/EncodedDataConsumer.java | 24 ++--
.../io/api/impl/LlapRecordReaderQueueSizeTest.java | 97 ++++++++++++++++
.../java/org/apache/hadoop/hive/common/Pool.java | 25 +++-
8 files changed, 262 insertions(+), 56 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0f34986..f002c6e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4146,13 +4146,16 @@ public class HiveConf extends Configuration {
"MR LineRecordRedader into LLAP cache, if this feature is enabled.
Safety flag."),
LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
"Whether to enable time counters for LLAP IO layer (time spent in
HDFS, etc.)"),
- LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000,
- "The default queue size for VRBs produced by a LLAP IO thread when the
processing is\n" +
+ LLAP_IO_VRB_QUEUE_LIMIT_MAX("hive.llap.io.vrb.queue.limit.max", 50000,
+ "The maximum queue size for VRBs produced by a LLAP IO thread when the
processing is\n" +
"slower than the IO. The actual queue size is set per fragment, and is
adjusted down\n" +
- "from the base, depending on the schema."),
- LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10,
+ "from the base, depending on the schema see
LLAP_IO_CVB_BUFFERED_SIZE."),
+ LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 1,
"The minimum queue size for VRBs produced by a LLAP IO thread when the
processing is\n" +
"slower than the IO (used when determining the size from base size)."),
+ LLAP_IO_CVB_BUFFERED_SIZE("hive.llap.io.cvb.memory.consumption.", 1L << 30,
+ "The amount of bytes used to buffer CVB between IO and Processor
Threads default to 1GB, "
+ + "this will be used to compute a best effort queue size for VRBs
produced by a LLAP IO thread."),
LLAP_IO_SHARE_OBJECT_POOLS("hive.llap.io.share.object.pools", false,
"Whether to used shared object pools in LLAP IO. A safety flag."),
LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
diff --git
a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
index 3900a45..371d939 100644
--- a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
+++ b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
@@ -154,6 +154,13 @@ public class FixedSizedObjectPool<T> implements Pool<T> {
return offerImpl(t);
}
+ @Override public void clear() {
+ T result = takeImpl();
+ while (result != null) {
+ result = takeImpl();
+ }
+ }
+
private T takeImpl() {
long oldState = reserveArrayIndex(OBJECTS, EMPTY);
if (oldState == NO_INDEX) return null; // For whatever reason, reserve
failed.
diff --git
a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
index b026e54..1c3fc07 100644
--- a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
+++ b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
@@ -29,6 +29,7 @@ import java.util.concurrent.FutureTask;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hadoop.hive.common.Pool;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -238,6 +239,29 @@ public class TestFixedSizedObjectPool {
assertTrue(OneObjHelper.THE_OBJECT == pool.take());
}
+ @Test
+ public void testClearImp() {
+ int size = 10;
+ FixedSizedObjectPool<Object>
+ fixedSizedObjectPool =
+ new FixedSizedObjectPool<>(size, new Pool.PoolObjectHelper<Object>() {
+ @Override public Object create() {
+ //Null is used as marker to be the end.
+ return null;
+ }
+
+ @Override public void resetBeforeOffer(Object o) {
+ //
+ }
+ });
+ for (int i = 0; i < size; i++) {
+ fixedSizedObjectPool.offer(new Object());
+ }
+ Assert.assertEquals(size, fixedSizedObjectPool.size());
+ assertNotNull(fixedSizedObjectPool.take());
+ fixedSizedObjectPool.clear();
+ assertNull(fixedSizedObjectPool.take());
+ }
private static void syncThreadStart(final CountDownLatch cdlIn, final
CountDownLatch cdlOut) {
cdlIn.countDown();
try {
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 6d7cf7d..a351a19 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.BlockingService;
+import javax.annotation.Nullable;
+
public class LlapUtil {
private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class);
@@ -372,7 +374,7 @@ public class LlapUtil {
}
- public static ThreadMXBean initThreadMxBean() {
+ @Nullable public static ThreadMXBean initThreadMxBean() {
ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
if (mxBean != null) {
if (!mxBean.isCurrentThreadCpuTimeSupported()) {
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 91c94ef..1378a01 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hive.llap.io.api.impl;
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -74,11 +75,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-class LlapRecordReader
- implements RecordReader<NullWritable, VectorizedRowBatch>,
Consumer<ColumnVectorBatch> {
+class LlapRecordReader implements RecordReader<NullWritable,
VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
private static final Logger LOG =
LoggerFactory.getLogger(LlapRecordReader.class);
private static final Object DONE_OBJECT = new Object();
@@ -91,7 +90,7 @@ class LlapRecordReader
private VectorizedOrcAcidRowBatchReader acidReader;
private final Object[] partitionValues;
- private final LinkedBlockingQueue<Object> queue;
+ private final ArrayBlockingQueue<Object> queue;
private final AtomicReference<Throwable> pendingError = new
AtomicReference<>(null);
/** Vector that is currently being processed by our user. */
@@ -160,14 +159,22 @@ class LlapRecordReader
TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
job, isAcidScan, Integer.MAX_VALUE);
-
- int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE,
job, daemonConf);
- int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN,
job, daemonConf);
- final boolean decimal64Support = HiveConf.getVar(job,
ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED)
- .equalsIgnoreCase("decimal_64");
- int limit = determineQueueLimit(queueLimitBase, queueLimitMin,
rbCtx.getRowColumnTypeInfos(), decimal64Support);
+ int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MAX,
job, daemonConf);
+ int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job,
daemonConf);
+ long bestEffortSize = getLongQueueVar(ConfVars.LLAP_IO_CVB_BUFFERED_SIZE,
job, daemonConf);
+
+ final boolean
+ decimal64Support =
+ HiveConf.getVar(job,
ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64");
+ int
+ limit =
+ determineQueueLimit(bestEffortSize,
+ queueLimitBase,
+ queueLimitMin,
+ rbCtx.getRowColumnTypeInfos(),
+ decimal64Support);
LOG.info("Queue limit for LlapRecordReader is " + limit);
- this.queue = new LinkedBlockingQueue<>(limit);
+ this.queue = new ArrayBlockingQueue<>(limit);
int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -197,24 +204,64 @@ class LlapRecordReader
return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var);
}
+ private static long getLongQueueVar(ConfVars var, JobConf jobConf,
Configuration daemonConf) {
+ // Check job config for overrides, otherwise use the default server value.
+ long jobVal = jobConf.getLong(var.varname, -1);
+ return (jobVal != -1) ? jobVal : HiveConf.getLongVar(daemonConf, var);
+ }
+
// For queue size estimation purposes, we assume all columns have weight
one, and the following
// types are counted as multiple columns. This is very primitive; if we
wanted to make it better,
// we'd increase the base limit, and adjust dynamically based on IO and
processing perf delays.
- private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
+ private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL =
10,
COL_WEIGHT_STRING = 8;
- private static int determineQueueLimit(
- int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean
decimal64Support) {
+
+ @VisibleForTesting
+ static int determineQueueLimit(long maxBufferedSize,
+ int queueLimitMax,
+ int queueLimitMin,
+ TypeInfo[] typeInfos,
+ final boolean decimal64Support) {
+ assert queueLimitMax >= queueLimitMin;
// If the values are equal, the queue limit is fixed.
- if (queueLimitBase == queueLimitMin) return queueLimitBase;
+ if (queueLimitMax == queueLimitMin) return queueLimitMax;
// If there are no columns (projection only join?) just assume no weight.
- if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
+ if (typeInfos == null || typeInfos.length == 0) return queueLimitMax;
+ // total weight as bytes
double totalWeight = 0;
- for (TypeInfo ti : typeInfos) {
+ int numberOfProjectedColumns = typeInfos.length;
+ double scale = Math.max(Math.log(numberOfProjectedColumns), 1);
+
+ // Assuming that an empty Column Vector is about 96 bytes the object
+ // org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector object
internals:
+ // OFFSET SIZE TYPE
DESCRIPTION
+ // VALUE
+ // 0 16
(object header)
+ // 16 1 boolean
ColumnVector.noNulls
+ // 17 1 boolean
ColumnVector.isRepeating
+ // 18 1 boolean
ColumnVector.preFlattenIsRepeating
+ // 19 1 boolean
ColumnVector.preFlattenNoNulls
+ // 20 4
(alignment/padding gap)
+ // 24 8 org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type
ColumnVector.type
+ // 32 8 boolean[]
ColumnVector.isNull
+ // 40 4 int
BytesColumnVector.nextFree
+ // 44 4 int
BytesColumnVector.smallBufferNextFree
+ // 48 4 int
BytesColumnVector.bufferAllocationCount
+ // 52 4
(alignment/padding gap)
+ // 56 8 byte[][]
BytesColumnVector.vector
+ // 64 8 int[]
BytesColumnVector.start
+ // 72 8 int[]
BytesColumnVector.length
+ // 80 8 byte[]
BytesColumnVector.buffer
+ // 88 8 byte[]
BytesColumnVector.smallBuffer
+ long columnVectorBaseSize = (long) (96 * numberOfProjectedColumns * scale);
+
+ for (int i = 0; i < typeInfos.length; i++) {
+ TypeInfo ti = typeInfos[i];
int colWeight;
if (ti.getCategory() != Category.PRIMITIVE) {
colWeight = COL_WEIGHT_COMPLEX;
} else {
- PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo) ti;
switch (pti.getPrimitiveCategory()) {
case BINARY:
case CHAR:
@@ -222,6 +269,11 @@ class LlapRecordReader
case STRING:
colWeight = COL_WEIGHT_STRING;
break;
+ //Timestamp column vector uses an int and long arrays
+ case TIMESTAMP:
+ case INTERVAL_DAY_TIME:
+ colWeight = 2;
+ break;
case DECIMAL:
boolean useDecimal64 = false;
if (ti instanceof DecimalTypeInfo) {
@@ -241,9 +293,13 @@ class LlapRecordReader
colWeight = 1;
}
}
- totalWeight += colWeight;
+ totalWeight += colWeight * 8 * scale;
}
- return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight));
+ //default batch size is 1024
+ totalWeight *= 1024;
+ totalWeight += columnVectorBaseSize;
+ int bestEffortSize = Math.min((int) (maxBufferedSize / totalWeight),
queueLimitMax);
+ return Math.max(bestEffortSize, queueLimitMin);
}
@@ -271,7 +327,7 @@ class LlapRecordReader
work = Utilities.getMergeWork(job, inputName);
}
- if (work == null || !(work instanceof MapWork)) {
+ if (!(work instanceof MapWork)) {
work = Utilities.getMapWork(job);
}
return (MapWork) work;
@@ -325,7 +381,7 @@ class LlapRecordReader
}
isFirst = false;
}
- ColumnVectorBatch cvb = null;
+ ColumnVectorBatch cvb;
try {
cvb = nextCvb();
} catch (InterruptedException e) {
@@ -347,10 +403,10 @@ class LlapRecordReader
// TODO: relying everywhere on the magical constants and columns being
together means ACID
// columns are going to be super hard to change in a backward
compat manner. I can
// foresee someone cursing while refactoring all the magic for
prefix schema changes.
- /**
- * Acid meta cols are always either all included or all excluded the
- * the width of 'cvb' changes accordingly so 'acidColCount' and
- * 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
+ /*
+ Acid meta cols are always either all included or all excluded the
+ the width of 'cvb' changes accordingly so 'acidColCount' and
+ 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
*/
// Exclude the row column.
int acidColCount = acidReader.includeAcidColumns() ?
@@ -467,7 +523,7 @@ class LlapRecordReader
// If the structure is replaced with smth that doesn't, we MUST check
interrupt here because
// Hive operators rely on recordreader to handle task interruption, and
unlike most RRs we
// do not do any blocking IO ops on this thread.
- Object next = null;
+ Object next;
do {
rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the
comment in the method.
next = queue.poll(100, TimeUnit.MILLISECONDS);
@@ -624,7 +680,7 @@ class LlapRecordReader
List<Integer> filePhysicalColumnIds = readerLogicalColumnIds;
if (isAcidScan) {
int rootCol = OrcInputFormat.getRootColumn(false);
- filePhysicalColumnIds = new
ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol);
+ filePhysicalColumnIds = new ArrayList<>(filePhysicalColumnIds.size() +
rootCol);
this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This
is somewhat fragile...
// Note: this guarantees that physical column IDs are in order.
for (int i = 0; i < rootCol; ++i) {
@@ -632,12 +688,12 @@ class LlapRecordReader
// struct to get read without projection.
if (acidStructColumnId == i) continue;
if(!includeAcidColumns) {
- /**
- * if not including acid columns, we still want to number the
- * physical columns as if acid columns are included becase
- * {@link #generateFileIncludes(TypeDescription)} takes the file
- * schema as input
- * (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
+ /*
+ if not including acid columns, we still want to number the
+ physical columns as if acid columns are included becase
+ {@link #generateFileIncludes(TypeDescription)} takes the file
+ schema as input
+ (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
*/
continue;
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 84436bc..10d76aa 100644
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -57,17 +57,15 @@ public abstract class EncodedDataConsumer<BatchKey,
BatchType extends EncodedCol
this.downstreamConsumer = consumer;
this.ioMetrics = ioMetrics;
this.mxBean = LlapUtil.initThreadMxBean();
- cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
- new Pool.PoolObjectHelper<ColumnVectorBatch>() {
- @Override
- public ColumnVectorBatch create() {
- return new ColumnVectorBatch(colCount);
- }
- @Override
- public void resetBeforeOffer(ColumnVectorBatch t) {
- // Don't reset anything, we are reusing column vectors.
- }
- });
+ cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, new
Pool.PoolObjectHelper<ColumnVectorBatch>() {
+ @Override public ColumnVectorBatch create() {
+ return new ColumnVectorBatch(colCount);
+ }
+
+ @Override public void resetBeforeOffer(ColumnVectorBatch t) {
+ // Don't reset anything, we are reusing column vectors.
+ }
+ });
this.counters = counters;
}
@@ -81,6 +79,9 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType
extends EncodedCol
@Override
public Void call() throws Exception {
+ if (mxBean == null) {
+ return readCallable.call();
+ }
long cpuTime = mxBean.getCurrentThreadCpuTime(),
userTime = mxBean.getCurrentThreadUserTime();
try {
@@ -145,6 +146,7 @@ public abstract class EncodedDataConsumer<BatchKey,
BatchType extends EncodedCol
@Override
public void setDone() throws InterruptedException {
downstreamConsumer.setDone();
+ cvbPool.clear();
}
@Override
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
new file mode 100644
index 0000000..7e71cf2
--- /dev/null
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class LlapRecordReaderQueueSizeTest {
+
+ private static final int END_EXCLUSIVE = 300;
+ private static final int MAX_BUFFERED_SIZE = 1 << 30; //1GB
+
+ @Test public void testMaxEqMin() {
+ int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null,
true);
+ Assert.assertEquals(100, expected);
+ }
+
+ @Test public void testMaxIsEnforced() {
+ TypeInfo[] cols = { new DecimalTypeInfo() };
+ int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1,
cols, true);
+ Assert.assertEquals(10, actual);
+ }
+
+ @Test public void testMinIsEnforced() {
+ TypeInfo[] cols = { new DecimalTypeInfo() };
+ int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, true);
+ Assert.assertEquals(5, actual);
+ }
+
+ @Test public void testOrderDecimal64VsFatDecimals() {
+ TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new
DecimalTypeInfo()).toArray(TypeInfo[]::new);
+ int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE,
10000, 5, cols, true);
+ Assert.assertEquals(75, actual);
+ // the idea it to see an order of 10 when using fat Decimals
+ actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5,
cols, false);
+ Assert.assertEquals(7, actual);
+ }
+
+ @Test public void testOrderDecimal64VsLong() {
+ TypeInfo[] decimalCols = ArrayOf(() -> new
DecimalTypeInfo(TypeDescription.MAX_DECIMAL64_PRECISION, 0));
+ TypeInfo[] longCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+
Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE,
10000, 5, longCols, true),
+ LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5,
decimalCols, true));
+ }
+
+ @Test public void testStringsColumns() {
+ TypeInfo[] charsCols = ArrayOf(() -> TypeInfoFactory.charTypeInfo);
+ TypeInfo[] stringCols = ArrayOf(() -> TypeInfoFactory.stringTypeInfo);
+ TypeInfo[] binaryCols = ArrayOf(() -> TypeInfoFactory.binaryTypeInfo);
+
Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE,
10000, 5, stringCols, true), 9);
+ Assert.assertEquals(9,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols,
true));
+ Assert.assertEquals(9,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols,
true));
+ }
+
+ @Test public void testLongColumns() {
+ TypeInfo[] longsCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+ TypeInfo[] intCols = ArrayOf(() -> TypeInfoFactory.intTypeInfo);
+ TypeInfo[] byteCols = ArrayOf(() -> TypeInfoFactory.byteTypeInfo);
+ Assert.assertEquals(75,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols,
true));
+ Assert.assertEquals(75,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols,
true));
+ Assert.assertEquals(75,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols,
true));
+ }
+
+ @Test public void testTimestampsColumns() {
+ TypeInfo[] tsCols = ArrayOf(() -> TypeInfoFactory.timestampTypeInfo);
+ TypeInfo[] intervalCols = ArrayOf(() ->
TypeInfoFactory.intervalDayTimeTypeInfo);
+ Assert.assertEquals(38,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols,
true));
+ Assert.assertEquals(38,
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols,
true));
+ }
+
+ private static TypeInfo[] ArrayOf(Supplier<TypeInfo> supplier) {
+ return IntStream.range(0, END_EXCLUSIVE).mapToObj(i ->
supplier.get()).toArray(TypeInfo[]::new);
+ }
+}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
index b9789ec..0522cc1 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
@@ -17,21 +17,36 @@
*/
package org.apache.hadoop.hive.common;
-/** Simple object pool to prevent GC on small objects passed between threads.
*/
+/**
+ * Simple object pool to prevent GC on small objects passed between threads.
+ */
public interface Pool<T> {
- /** Object helper for objects stored in the pool. */
+ /**
+ * Object helper for objects stored in the pool.
+ */
public interface PoolObjectHelper<T> {
- /** Called to create an object when one cannot be provided.
+ /**
+ * Called to create an object when one cannot be provided.
+ *
* @return a newly allocated object
*/
T create();
- /** Called before the object is put in the pool (regardless of whether put
succeeds).
+
+ /**
+ * Called before the object is put in the pool (regardless of whether put
succeeds).
+ *
* @param t the object to reset
*/
void resetBeforeOffer(T t);
}
T take();
+
void offer(T t);
+
int size();
-}
\ No newline at end of file
+
+ default void clear() {
+ //no op
+ }
+}