This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea614d  HIVE-21391: LLAP Pool of column vector buffers can cause 
memory pressure this fix make memroy estimation more tight (Slim Bouguerra 
reviewed by Prasanth Jayachandran)
5ea614d is described below

commit 5ea614d799b82237e2387b3bcb7b63e7db90a3c9
Author: Slim Bouguerra <[email protected]>
AuthorDate: Fri Jul 19 15:25:26 2019 -0700

    HIVE-21391: LLAP Pool of column vector buffers can cause memory pressure 
this fix make memroy estimation more tight (Slim Bouguerra reviewed by Prasanth 
Jayachandran)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  11 +-
 .../hive/common/util/FixedSizedObjectPool.java     |   7 ++
 .../hive/common/util/TestFixedSizedObjectPool.java |  24 ++++
 .../java/org/apache/hadoop/hive/llap/LlapUtil.java |   4 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java    | 126 +++++++++++++++------
 .../hive/llap/io/decode/EncodedDataConsumer.java   |  24 ++--
 .../io/api/impl/LlapRecordReaderQueueSizeTest.java |  97 ++++++++++++++++
 .../java/org/apache/hadoop/hive/common/Pool.java   |  25 +++-
 8 files changed, 262 insertions(+), 56 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0f34986..f002c6e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4146,13 +4146,16 @@ public class HiveConf extends Configuration {
         "MR LineRecordRedader into LLAP cache, if this feature is enabled. 
Safety flag."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in 
HDFS, etc.)"),
-    LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000,
-        "The default queue size for VRBs produced by a LLAP IO thread when the 
processing is\n" +
+    LLAP_IO_VRB_QUEUE_LIMIT_MAX("hive.llap.io.vrb.queue.limit.max", 50000,
+        "The maximum queue size for VRBs produced by a LLAP IO thread when the 
processing is\n" +
         "slower than the IO. The actual queue size is set per fragment, and is 
adjusted down\n" +
-        "from the base, depending on the schema."),
-    LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 10,
+        "from the base, depending on the schema see 
LLAP_IO_CVB_BUFFERED_SIZE."),
+    LLAP_IO_VRB_QUEUE_LIMIT_MIN("hive.llap.io.vrb.queue.limit.min", 1,
         "The minimum queue size for VRBs produced by a LLAP IO thread when the 
processing is\n" +
         "slower than the IO (used when determining the size from base size)."),
+    LLAP_IO_CVB_BUFFERED_SIZE("hive.llap.io.cvb.memory.consumption.", 1L << 30,
+        "The amount of bytes used to buffer CVB between IO and Processor 
Threads default to 1GB, "
+            + "this will be used to compute a best effort queue size for VRBs 
produced by a LLAP IO thread."),
     LLAP_IO_SHARE_OBJECT_POOLS("hive.llap.io.share.object.pools", false,
         "Whether to used shared object pools in LLAP IO. A safety flag."),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
diff --git 
a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java 
b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
index 3900a45..371d939 100644
--- a/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
+++ b/common/src/java/org/apache/hive/common/util/FixedSizedObjectPool.java
@@ -154,6 +154,13 @@ public class FixedSizedObjectPool<T> implements Pool<T> {
     return offerImpl(t);
   }
 
+  @Override public void clear() {
+    T result = takeImpl();
+    while (result != null) {
+      result = takeImpl();
+    }
+  }
+
   private T takeImpl() {
     long oldState = reserveArrayIndex(OBJECTS, EMPTY);
     if (oldState == NO_INDEX) return null; // For whatever reason, reserve 
failed.
diff --git 
a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java 
b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
index b026e54..1c3fc07 100644
--- a/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
+++ b/common/src/test/org/apache/hive/common/util/TestFixedSizedObjectPool.java
@@ -29,6 +29,7 @@ import java.util.concurrent.FutureTask;
 
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.hadoop.hive.common.Pool;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -238,6 +239,29 @@ public class TestFixedSizedObjectPool {
     assertTrue(OneObjHelper.THE_OBJECT == pool.take());
   }
 
+  @Test
+  public void testClearImp() {
+    int size = 10;
+    FixedSizedObjectPool<Object>
+        fixedSizedObjectPool =
+        new FixedSizedObjectPool<>(size, new Pool.PoolObjectHelper<Object>() {
+          @Override public Object create() {
+            //Null is used as marker to be the end.
+            return null;
+          }
+
+          @Override public void resetBeforeOffer(Object o) {
+            //
+          }
+        });
+    for (int i = 0; i < size; i++) {
+      fixedSizedObjectPool.offer(new Object());
+    }
+    Assert.assertEquals(size, fixedSizedObjectPool.size());
+    assertNotNull(fixedSizedObjectPool.take());
+    fixedSizedObjectPool.clear();
+    assertNull(fixedSizedObjectPool.take());
+  }
   private static void syncThreadStart(final CountDownLatch cdlIn, final 
CountDownLatch cdlOut) {
     cdlIn.countDown();
     try {
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 6d7cf7d..a351a19 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.BlockingService;
 
+import javax.annotation.Nullable;
+
 public class LlapUtil {
   private static final Logger LOG = LoggerFactory.getLogger(LlapUtil.class);
 
@@ -372,7 +374,7 @@ public class LlapUtil {
   }
 
 
-  public static ThreadMXBean initThreadMxBean() {
+  @Nullable public static ThreadMXBean initThreadMxBean() {
     ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
     if (mxBean != null) {
       if (!mxBean.isCurrentThreadCpuTimeSupported()) {
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 91c94ef..1378a01 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hive.llap.io.api.impl;
 import java.util.ArrayList;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -74,11 +75,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-class LlapRecordReader
-    implements RecordReader<NullWritable, VectorizedRowBatch>, 
Consumer<ColumnVectorBatch> {
+class LlapRecordReader implements RecordReader<NullWritable, 
VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapRecordReader.class);
   private static final Object DONE_OBJECT = new Object();
@@ -91,7 +90,7 @@ class LlapRecordReader
   private VectorizedOrcAcidRowBatchReader acidReader;
   private final Object[] partitionValues;
 
-  private final LinkedBlockingQueue<Object> queue;
+  private final ArrayBlockingQueue<Object> queue;
   private final AtomicReference<Throwable> pendingError = new 
AtomicReference<>(null);
 
   /** Vector that is currently being processed by our user. */
@@ -160,14 +159,22 @@ class LlapRecordReader
     TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(
         job, isAcidScan, Integer.MAX_VALUE);
 
-
-    int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, 
job, daemonConf);
-    int queueLimitMin =  getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, 
job, daemonConf);
-    final boolean decimal64Support = HiveConf.getVar(job, 
ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED)
-      .equalsIgnoreCase("decimal_64");
-    int limit = determineQueueLimit(queueLimitBase, queueLimitMin, 
rbCtx.getRowColumnTypeInfos(), decimal64Support);
+    int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MAX, 
job, daemonConf);
+    int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, 
daemonConf);
+    long bestEffortSize = getLongQueueVar(ConfVars.LLAP_IO_CVB_BUFFERED_SIZE, 
job, daemonConf);
+
+    final boolean
+        decimal64Support =
+        HiveConf.getVar(job, 
ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64");
+    int
+        limit =
+        determineQueueLimit(bestEffortSize,
+            queueLimitBase,
+            queueLimitMin,
+            rbCtx.getRowColumnTypeInfos(),
+            decimal64Support);
     LOG.info("Queue limit for LlapRecordReader is " + limit);
-    this.queue = new LinkedBlockingQueue<>(limit);
+    this.queue = new ArrayBlockingQueue<>(limit);
 
 
     int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -197,24 +204,64 @@ class LlapRecordReader
     return (jobVal != -1) ? jobVal : HiveConf.getIntVar(daemonConf, var);
   }
 
+  private static long getLongQueueVar(ConfVars var, JobConf jobConf, 
Configuration daemonConf) {
+    // Check job config for overrides, otherwise use the default server value.
+    long jobVal = jobConf.getLong(var.varname, -1);
+    return (jobVal != -1) ? jobVal : HiveConf.getLongVar(daemonConf, var);
+  }
+
   // For queue size estimation purposes, we assume all columns have weight 
one, and the following
   // types are counted as multiple columns. This is very primitive; if we 
wanted to make it better,
   // we'd increase the base limit, and adjust dynamically based on IO and 
processing perf delays.
-  private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
+  private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 
10,
       COL_WEIGHT_STRING = 8;
-  private static int determineQueueLimit(
-    int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean 
decimal64Support) {
+
+  @VisibleForTesting
+  static int determineQueueLimit(long maxBufferedSize,
+      int queueLimitMax,
+      int queueLimitMin,
+      TypeInfo[] typeInfos,
+      final boolean decimal64Support) {
+    assert queueLimitMax >= queueLimitMin;
     // If the values are equal, the queue limit is fixed.
-    if (queueLimitBase == queueLimitMin) return queueLimitBase;
+    if (queueLimitMax == queueLimitMin) return queueLimitMax;
     // If there are no columns (projection only join?) just assume no weight.
-    if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
+    if (typeInfos == null || typeInfos.length == 0) return queueLimitMax;
+    // total weight as bytes
     double totalWeight = 0;
-    for (TypeInfo ti : typeInfos) {
+    int numberOfProjectedColumns = typeInfos.length;
+    double scale = Math.max(Math.log(numberOfProjectedColumns), 1);
+
+    // Assuming that an empty Column Vector is about 96 bytes the object
+    // org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector object 
internals:
+    // OFFSET  SIZE                                                      TYPE 
DESCRIPTION
+    // VALUE
+    //      0    16                                                           
(object header)
+    //     16     1                                                   boolean 
ColumnVector.noNulls
+    //     17     1                                                   boolean 
ColumnVector.isRepeating
+    //     18     1                                                   boolean 
ColumnVector.preFlattenIsRepeating
+    //     19     1                                                   boolean 
ColumnVector.preFlattenNoNulls
+    //     20     4                                                           
(alignment/padding gap)
+    //     24     8   org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type 
ColumnVector.type
+    //     32     8                                                 boolean[] 
ColumnVector.isNull
+    //     40     4                                                       int 
BytesColumnVector.nextFree
+    //     44     4                                                       int 
BytesColumnVector.smallBufferNextFree
+    //     48     4                                                       int 
BytesColumnVector.bufferAllocationCount
+    //     52     4                                                           
(alignment/padding gap)
+    //     56     8                                                  byte[][] 
BytesColumnVector.vector
+    //     64     8                                                     int[] 
BytesColumnVector.start
+    //     72     8                                                     int[] 
BytesColumnVector.length
+    //     80     8                                                    byte[] 
BytesColumnVector.buffer
+    //     88     8                                                    byte[] 
BytesColumnVector.smallBuffer
+    long columnVectorBaseSize = (long) (96 * numberOfProjectedColumns * scale);
+
+    for (int i = 0; i < typeInfos.length; i++) {
+      TypeInfo ti = typeInfos[i];
       int colWeight;
       if (ti.getCategory() != Category.PRIMITIVE) {
         colWeight = COL_WEIGHT_COMPLEX;
       } else {
-        PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+        PrimitiveTypeInfo pti = (PrimitiveTypeInfo) ti;
         switch (pti.getPrimitiveCategory()) {
         case BINARY:
         case CHAR:
@@ -222,6 +269,11 @@ class LlapRecordReader
         case STRING:
           colWeight = COL_WEIGHT_STRING;
           break;
+          //Timestamp column vector uses an int and long arrays
+        case TIMESTAMP:
+        case INTERVAL_DAY_TIME:
+          colWeight = 2;
+          break;
         case DECIMAL:
           boolean useDecimal64 = false;
           if (ti instanceof DecimalTypeInfo) {
@@ -241,9 +293,13 @@ class LlapRecordReader
           colWeight = 1;
         }
       }
-      totalWeight += colWeight;
+      totalWeight += colWeight * 8 * scale;
     }
-    return Math.max(queueLimitMin, (int)(queueLimitBase / totalWeight));
+    //default batch size is 1024
+    totalWeight *= 1024;
+    totalWeight +=  columnVectorBaseSize;
+    int bestEffortSize = Math.min((int) (maxBufferedSize / totalWeight), 
queueLimitMax);
+    return Math.max(bestEffortSize, queueLimitMin);
   }
 
 
@@ -271,7 +327,7 @@ class LlapRecordReader
       work = Utilities.getMergeWork(job, inputName);
     }
 
-    if (work == null || !(work instanceof MapWork)) {
+    if (!(work instanceof MapWork)) {
       work = Utilities.getMapWork(job);
     }
     return (MapWork) work;
@@ -325,7 +381,7 @@ class LlapRecordReader
       }
       isFirst = false;
     }
-    ColumnVectorBatch cvb = null;
+    ColumnVectorBatch cvb;
     try {
       cvb = nextCvb();
     } catch (InterruptedException e) {
@@ -347,10 +403,10 @@ class LlapRecordReader
         // TODO: relying everywhere on the magical constants and columns being 
together means ACID
         //       columns are going to be super hard to change in a backward 
compat manner. I can
         //       foresee someone cursing while refactoring all the magic for 
prefix schema changes.
-        /**
-         * Acid meta cols are always either all included or all excluded the
-         * the width of 'cvb' changes accordingly so 'acidColCount' and
-         * 'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
+        /*
+          Acid meta cols are always either all included or all excluded the
+          the width of 'cvb' changes accordingly so 'acidColCount' and
+          'ixInVrb' need to be adjusted. See {@link IncludesImpl} comments.
          */
         // Exclude the row column.
         int acidColCount = acidReader.includeAcidColumns() ?
@@ -467,7 +523,7 @@ class LlapRecordReader
     // If the structure is replaced with smth that doesn't, we MUST check 
interrupt here because
     // Hive operators rely on recordreader to handle task interruption, and 
unlike most RRs we
     // do not do any blocking IO ops on this thread.
-    Object next = null;
+    Object next;
     do {
       rethrowErrorIfAny(pendingError.get()); // Best-effort check; see the 
comment in the method.
       next = queue.poll(100, TimeUnit.MILLISECONDS);
@@ -624,7 +680,7 @@ class LlapRecordReader
       List<Integer> filePhysicalColumnIds = readerLogicalColumnIds;
       if (isAcidScan) {
         int rootCol = OrcInputFormat.getRootColumn(false);
-        filePhysicalColumnIds = new 
ArrayList<Integer>(filePhysicalColumnIds.size() + rootCol);
+        filePhysicalColumnIds = new ArrayList<>(filePhysicalColumnIds.size() + 
rootCol);
         this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This 
is somewhat fragile...
         // Note: this guarantees that physical column IDs are in order.
         for (int i = 0; i < rootCol; ++i) {
@@ -632,12 +688,12 @@ class LlapRecordReader
           // struct to get read without projection.
           if (acidStructColumnId == i) continue;
           if(!includeAcidColumns) {
-            /**
-             * if not including acid columns, we still want to number the
-             * physical columns as if acid columns are included becase
-             * {@link #generateFileIncludes(TypeDescription)} takes the file
-             * schema as input
-             * (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
+            /*
+              if not including acid columns, we still want to number the
+              physical columns as if acid columns are included becase
+              {@link #generateFileIncludes(TypeDescription)} takes the file
+              schema as input
+              (eg <op, owid, writerId, rowid, cwid, <f1, ... fn>>)
              */
             continue;
           }
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 84436bc..10d76aa 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -57,17 +57,15 @@ public abstract class EncodedDataConsumer<BatchKey, 
BatchType extends EncodedCol
     this.downstreamConsumer = consumer;
     this.ioMetrics = ioMetrics;
     this.mxBean = LlapUtil.initThreadMxBean();
-    cvbPool = new FixedSizedObjectPool<ColumnVectorBatch>(CVB_POOL_SIZE,
-        new Pool.PoolObjectHelper<ColumnVectorBatch>() {
-          @Override
-          public ColumnVectorBatch create() {
-            return new ColumnVectorBatch(colCount);
-          }
-          @Override
-          public void resetBeforeOffer(ColumnVectorBatch t) {
-            // Don't reset anything, we are reusing column vectors.
-          }
-        });
+    cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, new 
Pool.PoolObjectHelper<ColumnVectorBatch>() {
+      @Override public ColumnVectorBatch create() {
+        return new ColumnVectorBatch(colCount);
+      }
+
+      @Override public void resetBeforeOffer(ColumnVectorBatch t) {
+        // Don't reset anything, we are reusing column vectors.
+      }
+    });
     this.counters = counters;
   }
 
@@ -81,6 +79,9 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType 
extends EncodedCol
 
     @Override
     public Void call() throws Exception {
+      if (mxBean == null) {
+        return readCallable.call();
+      }
       long cpuTime = mxBean.getCurrentThreadCpuTime(),
           userTime = mxBean.getCurrentThreadUserTime();
       try {
@@ -145,6 +146,7 @@ public abstract class EncodedDataConsumer<BatchKey, 
BatchType extends EncodedCol
   @Override
   public void setDone() throws InterruptedException {
     downstreamConsumer.setDone();
+    cvbPool.clear();
   }
 
   @Override
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
new file mode 100644
index 0000000..7e71cf2
--- /dev/null
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReaderQueueSizeTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.impl;
+
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class LlapRecordReaderQueueSizeTest {
+
+  private static final int END_EXCLUSIVE = 300;
+  private static final int MAX_BUFFERED_SIZE = 1 << 30; //1GB
+
+  @Test public void testMaxEqMin() {
+    int expected = LlapRecordReader.determineQueueLimit(0, 100, 100, null, 
true);
+    Assert.assertEquals(100, expected);
+  }
+
+  @Test public void testMaxIsEnforced() {
+    TypeInfo[] cols = { new DecimalTypeInfo() };
+    int actual = LlapRecordReader.determineQueueLimit(Long.MAX_VALUE, 10, 1, 
cols, true);
+    Assert.assertEquals(10, actual);
+  }
+
+  @Test public void testMinIsEnforced() {
+    TypeInfo[] cols = { new DecimalTypeInfo() };
+    int actual = LlapRecordReader.determineQueueLimit(0, 10, 5, cols, true);
+    Assert.assertEquals(5, actual);
+  }
+
+  @Test public void testOrderDecimal64VsFatDecimals() {
+    TypeInfo[] cols = IntStream.range(0, 300).mapToObj(i -> new 
DecimalTypeInfo()).toArray(TypeInfo[]::new);
+    int actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 
10000, 5, cols, true);
+    Assert.assertEquals(75, actual);
+    // the idea it to see an order of 10 when using fat Decimals
+    actual = LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, 
cols, false);
+    Assert.assertEquals(7, actual);
+  }
+
+  @Test public void testOrderDecimal64VsLong() {
+    TypeInfo[] decimalCols = ArrayOf(() -> new 
DecimalTypeInfo(TypeDescription.MAX_DECIMAL64_PRECISION, 0));
+    TypeInfo[] longCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+    
Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 
10000, 5, longCols, true),
+        LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, 
decimalCols, true));
+  }
+
+  @Test public void testStringsColumns() {
+    TypeInfo[] charsCols = ArrayOf(() -> TypeInfoFactory.charTypeInfo);
+    TypeInfo[] stringCols = ArrayOf(() -> TypeInfoFactory.stringTypeInfo);
+    TypeInfo[] binaryCols = ArrayOf(() -> TypeInfoFactory.binaryTypeInfo);
+    
Assert.assertEquals(LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 
10000, 5, stringCols, true), 9);
+    Assert.assertEquals(9, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, charsCols, 
true));
+    Assert.assertEquals(9, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, binaryCols, 
true));
+  }
+
+  @Test public void testLongColumns() {
+    TypeInfo[] longsCols = ArrayOf(() -> TypeInfoFactory.longTypeInfo);
+    TypeInfo[] intCols = ArrayOf(() -> TypeInfoFactory.intTypeInfo);
+    TypeInfo[] byteCols = ArrayOf(() -> TypeInfoFactory.byteTypeInfo);
+    Assert.assertEquals(75, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, longsCols, 
true));
+    Assert.assertEquals(75, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intCols, 
true));
+    Assert.assertEquals(75, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, byteCols, 
true));
+  }
+
+  @Test public void testTimestampsColumns() {
+    TypeInfo[] tsCols = ArrayOf(() -> TypeInfoFactory.timestampTypeInfo);
+    TypeInfo[] intervalCols = ArrayOf(() -> 
TypeInfoFactory.intervalDayTimeTypeInfo);
+    Assert.assertEquals(38, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, tsCols, 
true));
+    Assert.assertEquals(38, 
LlapRecordReader.determineQueueLimit(MAX_BUFFERED_SIZE, 10000, 5, intervalCols, 
true));
+  }
+
+  private static TypeInfo[] ArrayOf(Supplier<TypeInfo> supplier) {
+    return IntStream.range(0, END_EXCLUSIVE).mapToObj(i -> 
supplier.get()).toArray(TypeInfo[]::new);
+  }
+}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java 
b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
index b9789ec..0522cc1 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/Pool.java
@@ -17,21 +17,36 @@
  */
 package org.apache.hadoop.hive.common;
 
-/** Simple object pool to prevent GC on small objects passed between threads. 
*/
+/**
+ * Simple object pool to prevent GC on small objects passed between threads.
+ */
 public interface Pool<T> {
-  /** Object helper for objects stored in the pool. */
+  /**
+   * Object helper for objects stored in the pool.
+   */
   public interface PoolObjectHelper<T> {
-    /** Called to create an object when one cannot be provided.
+    /**
+     * Called to create an object when one cannot be provided.
+     *
      * @return a newly allocated object
      */
     T create();
-    /** Called before the object is put in the pool (regardless of whether put 
succeeds).
+
+    /**
+     * Called before the object is put in the pool (regardless of whether put 
succeeds).
+     *
      * @param t the object to reset
      */
     void resetBeforeOffer(T t);
   }
 
   T take();
+
   void offer(T t);
+
   int size();
-}
\ No newline at end of file
+
+  default void clear() {
+    //no op
+  }
+}

Reply via email to