This is an automated email from the ASF dual-hosted git repository.

pgaref pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b172859  HIVE-25146: JMH tests for Multi HT and parallel load (#3044) 
(Panagiotis Garefalakis, reviewed by Ramesh Kumar)
b172859 is described below

commit b17285975b881265f9773bc43b1fbdbebecef364
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Mon Mar 14 11:11:53 2022 -0700

    HIVE-25146: JMH tests for Multi HT and parallel load (#3044) (Panagiotis 
Garefalakis, reviewed by Ramesh Kumar)
    
    * JMH tests for parallel HT load, configuration parameters include 
LOAD_THREADS_NUM, ROWS_NUM and JOIN_TYPE. A single thread simulates the default 
load behaviour while ROWS_NUM < 1M will default to a single thread for 
simplicity. Higher number of threads >=2 evaluates the benefit of parallel 
loading for the HT.
    
    Change-Id: I1e0f07bf93ae57ff54022dc145fc9a7defd44782
---
 itests/hive-jmh/pom.xml                            |  22 +++
 .../mapjoin/load/AbstractHTLoadBench.java          | 174 +++++++++++++++++++
 .../vectorization/mapjoin/load/BytesKeyBase.java   | 132 ++++++++++++++
 .../LegacyVectorMapJoinFastHashTableLoader.java    | 189 +++++++++++++++++++++
 .../vectorization/mapjoin/load/LongKeyBase.java    |  80 +++++++++
 .../vectorization/mapjoin/load/MultiKeyBase.java   |  89 ++++++++++
 .../mapjoin/load/VectorFastHTBytesKeyBench.java    |  68 ++++++++
 .../mapjoin/load/VectorFastHTLongKeyBench.java     |  68 ++++++++
 .../mapjoin/load/VectorFastHTMultiKeyBench.java    |  68 ++++++++
 .../fast/VectorMapJoinFastHashTableLoader.java     |  13 ++
 10 files changed, 903 insertions(+)

diff --git a/itests/hive-jmh/pom.xml b/itests/hive-jmh/pom.xml
index de083df..6a2121b 100644
--- a/itests/hive-jmh/pom.xml
+++ b/itests/hive-jmh/pom.xml
@@ -67,6 +67,10 @@
       <classifier>tests</classifier>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
@@ -74,6 +78,24 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>${powermock.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito2</artifactId>
+      <version>${powermock.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
new file mode 100644
index 0000000..45a8706
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.HashTableLoader;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.CreateMapJoinResult;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestData;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.MapJoinPlanVariation;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.SmallTableGenerationParameters;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+
+@BenchmarkMode(Mode.AverageTime)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public abstract class AbstractHTLoadBench {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractHTLoadBench.class.getName());
+
+  protected VectorMapJoinVariation vectorMapJoinVariation;
+  protected MapJoinTestImplementation mapJoinImplementation;
+  protected CreateMapJoinResult createMapJoinResult;
+
+  protected int rowCount;
+  protected MapJoinTestData testData;
+  protected TezContext mockTezContext;
+  protected MapJoinTestDescription testDesc;
+  protected CustomKeyValueReader customKeyValueReader;
+
+
+  @Param({"0", "1", "2", "4", "8", "16", "32", "64"})
+  static int LOAD_THREADS_NUM;
+
+  @Param({"10000", "2000000"})
+  static int ROWS_NUM;
+
+  // MultiSet, HashSet, HashMap
+  @Param({"INNER_BIG_ONLY", "LEFT_SEMI", "INNER"})
+  static String JOIN_TYPE;
+
+  @Benchmark
+  public void hashTableLoadBench() throws Exception {
+    /* Mocking TezContext */
+    this.mockTezContext = PowerMockito.mock(TezContext.class);
+    ProcessorContext mockProcessorContest = 
PowerMockito.mock(ProcessorContext.class);
+    AbstractLogicalInput mockLogicalInput = 
PowerMockito.mock(AbstractLogicalInput.class);
+    TezCounters mockTezCounters = PowerMockito.mock(TezCounters.class);
+    TezCounter mockTezCounter = PowerMockito.mock(TezCounter.class);
+    InputContext mockInputContext = PowerMockito.mock(InputContext.class);
+    // Make sure the KEY estimation is correct to have properly sized HT
+    PowerMockito.when(mockTezCounter.getValue()).thenReturn((long)rowCount);
+    
PowerMockito.when(mockTezContext.getInput(any())).thenReturn(mockLogicalInput);
+    
PowerMockito.when(mockLogicalInput.getContext()).thenReturn(mockInputContext);
+    
PowerMockito.when(mockInputContext.getCounters()).thenReturn(mockTezCounters);
+    PowerMockito.when(mockTezCounters.findCounter(anyString(), 
anyString())).thenReturn(mockTezCounter);
+    
PowerMockito.when(mockTezContext.getTezProcessorContext()).thenReturn(mockProcessorContest);
+    
PowerMockito.when(mockTezContext.getTezProcessorContext().getCounters()).thenReturn(mockTezCounters);
+    
PowerMockito.when(mockTezContext.getTezProcessorContext().getCounters().findCounter(anyString(),
 anyString())).
+        thenReturn(mockTezCounter);
+    // Replace streaming Tez-Input with our custom Iterator
+    
PowerMockito.when(mockTezContext.getInput(any())).thenReturn(mockLogicalInput);
+    
PowerMockito.when(mockLogicalInput.getReader()).thenReturn(customKeyValueReader);
+    /* Mocking Done*/
+    HashTableLoader ht = LOAD_THREADS_NUM == 0 ?
+        new LegacyVectorMapJoinFastHashTableLoader(mockTezContext, 
testDesc.hiveConf, createMapJoinResult.mapJoinOperator) :
+        new VectorMapJoinFastHashTableLoader(mockTezContext, 
testDesc.hiveConf, createMapJoinResult.mapJoinOperator);
+    ht.load(new 
MapJoinTableContainer[createMapJoinResult.mapJoinOperator.getConf().getTagLength()],
 null);
+  }
+
+  // Common Setup
+  protected void setupMapJoinHT(HiveConf hiveConf, long seed, int rowCount,
+      VectorMapJoinVariation vectorMapJoinVariation, MapJoinTestImplementation 
mapJoinImplementation,
+      String[] bigTableColumnNames, TypeInfo[] bigTableTypeInfos, int[] 
bigTableKeyColumnNums,
+      TypeInfo[] smallTableValueTypeInfos, int[] smallTableRetainKeyColumnNums,
+      SmallTableGenerationParameters smallTableGenerationParameters) throws 
Exception {
+
+    hiveConf.set(HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS.varname, 
LOAD_THREADS_NUM + "");
+    LOG.info("Number of threads: " + 
hiveConf.get(HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS.varname));
+
+
+    this.rowCount = rowCount;
+
+    this.vectorMapJoinVariation = vectorMapJoinVariation;
+    this.mapJoinImplementation = mapJoinImplementation;
+    this.testDesc = new MapJoinTestDescription(hiveConf, 
vectorMapJoinVariation, bigTableColumnNames, bigTableTypeInfos,
+        bigTableKeyColumnNums, smallTableValueTypeInfos, 
smallTableRetainKeyColumnNums, smallTableGenerationParameters,
+        MapJoinPlanVariation.DYNAMIC_PARTITION_HASH_JOIN);
+    this.testData = new MapJoinTestData(1, testDesc, seed);
+
+    MapJoinDesc mapJoinDesc = MapJoinTestConfig.createMapJoinDesc(testDesc);
+    this.createMapJoinResult = 
MapJoinTestConfig.createMapJoinImplementation(mapJoinImplementation, testDesc, 
testData,
+        mapJoinDesc,/* shareMapJoinTableContainer */ null);
+  }
+
+  static class CustomKeyValueReader extends KeyValueReader {
+    private BytesWritable[] keys;
+    private BytesWritable[] values;
+    private int idx = -1;
+
+    public CustomKeyValueReader(BytesWritable[] k, BytesWritable[] v) {
+      this.keys = k;
+      this.values = v;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (++idx >= keys.length)
+        return false;
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return keys[idx];
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException {
+      return values[idx];
+    }
+
+    void reset() {
+      this.idx = -1;
+    }
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
new file mode 100644
index 0000000..a4ddd9d
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.RandomTypeUtil;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import 
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import 
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class BytesKeyBase extends AbstractHTLoadBench {
+
+  public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation 
vectorMapJoinVariation,
+      MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int 
rows) throws Exception {
+    long seed = 2543;
+    int rowCount = rows;
+    HiveConf hiveConf = new HiveConf();
+    int[] bigTableKeyColumnNums = new int[] { 0 };
+    String[] bigTableColumnNames = new String[] { "b1" };
+    TypeInfo[] bigTableTypeInfos = new TypeInfo[] { 
TypeInfoFactory.stringTypeInfo };
+    int[] smallTableRetainKeyColumnNums = new int[] {};
+    TypeInfo[] smallTableValueTypeInfos =
+        new TypeInfo[] { TypeInfoFactory.dateTypeInfo, 
TypeInfoFactory.timestampTypeInfo };
+    MapJoinTestDescription.SmallTableGenerationParameters 
smallTableGenerationParameters =
+        new MapJoinTestDescription.SmallTableGenerationParameters();
+    smallTableGenerationParameters
+        
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+    setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation, 
mapJoinImplementation, bigTableColumnNames,
+        bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos, 
smallTableRetainKeyColumnNums,
+        smallTableGenerationParameters);
+    this.customKeyValueReader = generateByteKVPairs(rowCount, seed);
+  }
+
+  private static CustomKeyValueReader generateByteKVPairs(int rows, long seed) 
throws IOException {
+    LOG.info("Data GEN for: " + rows);
+    Random random = new Random(seed);
+    BytesWritable[] keys = new BytesWritable[rows];
+    BytesWritable[] values = new BytesWritable[rows];
+    for (int i = 0; i < rows; i++) {
+      keys[i] = new BytesWritable();
+      values[i] = new BytesWritable();
+    }
+    long startTime = System.currentTimeMillis();
+
+    BinarySortableSerializeWrite serializeWrite = new 
BinarySortableSerializeWrite(1);
+    ByteStream.Output output = new ByteStream.Output();
+    int str_len = 20;
+
+    for (int i = 0; i < rows; i++) {
+      output.reset();
+      Text k = new Text(RandomTypeUtil.getRandString(random, null, str_len));
+      serializeWrite.set(output);
+      VerifyFastRow.serializeWrite(serializeWrite, 
TypeInfoFactory.stringTypeInfo, k);
+      keys[i].set(output.getData(), 0, output.getLength());
+
+      output.reset();
+      Text v = new Text(RandomTypeUtil.getRandString(random, null, str_len*2));
+      serializeWrite.set(output);
+      VerifyFastRow.serializeWrite(serializeWrite, 
TypeInfoFactory.stringTypeInfo, v);
+      values[i].set(output.getData(), 0, output.getLength());
+    }
+    LOG.info("Data GEN done after {} sec",
+        TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
startTime));
+    return new CustomKeyValueReader(keys, values);
+  }
+
+  public static String deserializeBinary(BytesWritable currentKey) throws 
HiveException {
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo 
};
+    boolean[] columnSortOrderIsDesc = new boolean[1];
+    Arrays.fill(columnSortOrderIsDesc, false);
+
+    byte[] columnNullMarker = new byte[1];
+    Arrays.fill(columnNullMarker, BinarySortableSerDe.ZERO);
+    byte[] columnNotNullMarker = new byte[1];
+    Arrays.fill(columnNotNullMarker, BinarySortableSerDe.ONE);
+
+    BinarySortableDeserializeRead keyBinarySortableDeserializeRead =
+        new BinarySortableDeserializeRead(
+            primitiveTypeInfos,
+            /* useExternalBuffer */ false,
+            columnSortOrderIsDesc,
+            columnNullMarker,
+            columnNotNullMarker);
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return null;
+      }
+    } catch (Exception e) {
+      throw new HiveException("DeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+    }
+
+
+    byte[] stringBytes = Arrays.copyOfRange(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesStart + 
keyBinarySortableDeserializeRead.currentBytesLength);
+    Text text = new Text(stringBytes);
+    String string = text.toString();
+    return string;
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
new file mode 100644
index 0000000..7af9380
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements 
org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class);
+  private Configuration hconf;
+  protected MapJoinDesc desc;
+  private TezContext tezContext;
+  private String cacheKey;
+  private TezCounter htLoadCounter;
+
+  public LegacyVectorMapJoinFastHashTableLoader(TezContext context, 
Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = (MapJoinDesc)joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = 
this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf,
 HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+  }
+
+  @Override
+  public void init(ExecMapperContext context, MapredContext mrContext,
+      Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    String counterGroup = HiveConf.getVar(hconf, 
HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    String counterName = 
Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(),
 vertexName);
+    this.htLoadCounter = 
tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, 
counterName);
+  }
+
+  @Override
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+      throws HiveException {
+
+    Map<Integer, String> parentToInput = desc.getParentToInput();
+    Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = 
memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out 
some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        LOG.info("Memory monitoring for hash table loader enabled. {}", 
memoryMonitorInfo);
+      }
+    }
+
+    if (!doMemCheck) {
+      LOG.info("Not doing hash table memory monitoring. {}", 
memoryMonitorInfo);
+    }
+    for (int pos = 0; pos < mapJoinTables.length; pos++) {
+      if (pos == desc.getPosBigTable()) {
+        continue;
+      }
+
+      long numEntries = 0;
+      String inputName = parentToInput.get(pos);
+      LogicalInput input = tezContext.getInput(inputName);
+
+      try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
+        KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+        Long keyCountObj = parentKeyCounts.get(pos);
+        long estKeyCount = (keyCountObj == null) ? -1 : keyCountObj;
+
+        long inputRecords = -1;
+        try {
+          //TODO : Need to use class instead of string.
+          // https://issues.apache.org/jira/browse/HIVE-23981
+          inputRecords = ((AbstractLogicalInput) 
input).getContext().getCounters().
+              findCounter("org.apache.tez.common.counters.TaskCounter",
+                  "APPROXIMATE_INPUT_RECORDS").getValue();
+        } catch (Exception e) {
+          LOG.debug("Failed to get value for counter 
APPROXIMATE_INPUT_RECORDS", e);
+        }
+        long keyCount = Math.max(estKeyCount, inputRecords);
+
+        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
+
+        LOG.info("Loading hash table for input: {} cacheKey: {} 
tableContainer: {} smallTablePos: {} " +
+                "estKeyCount : {} keyCount : {}", inputName, cacheKey,
+            vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, 
estKeyCount, keyCount);
+
+        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes 
here.
+        long startTime = System.currentTimeMillis();
+        while (kvReader.next()) {
+          
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
+              (BytesWritable)kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && (numEntries % 
memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = 
vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for 
input: " + inputName +
+                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + 
estMemUsage +
+                  " effectiveThreshold: " + effectiveThreshold + " 
memoryMonitorInfo: " + memoryMonitorInfo;
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              LOG.info(
+                  "Checking hash table loader memory usage for input: {} 
numEntries: {} "
+                      + "estimatedMemoryUsage: {} effectiveThreshold: {}",
+                  inputName, numEntries, estMemUsage, effectiveThreshold);
+            }
+          }
+        }
+        long delta = System.currentTimeMillis() - startTime;
+        htLoadCounter.increment(delta);
+
+        vectorMapJoinFastTableContainer.seal();
+        mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+        if (doMemCheck) {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} 
numEntries: {} " +
+                  "estimatedMemoryUsage: {} Load Time : {} ", inputName, 
cacheKey, numEntries,
+              vectorMapJoinFastTableContainer.getEstimatedMemorySize(), delta);
+        } else {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} 
numEntries: {} Load Time : {} ",
+              inputName, cacheKey, numEntries, delta);
+        }
+      } catch (IOException e) {
+        throw new HiveException(e);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+    }
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
new file mode 100644
index 0000000..7bbaa9c
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import 
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class LongKeyBase extends AbstractHTLoadBench {
+
+  public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation 
vectorMapJoinVariation,
+      MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int 
rows) throws Exception {
+    long seed = 2543;
+    int rowCount = rows;
+    HiveConf hiveConf = new HiveConf();
+    int[] bigTableKeyColumnNums = new int[] { 0 };
+    String[] bigTableColumnNames = new String[] { "number1" };
+    TypeInfo[] bigTableTypeInfos = new TypeInfo[] { 
TypeInfoFactory.longTypeInfo };
+    int[] smallTableRetainKeyColumnNums = new int[] {};
+    TypeInfo[] smallTableValueTypeInfos =
+        new TypeInfo[] { TypeInfoFactory.dateTypeInfo, 
TypeInfoFactory.stringTypeInfo };
+    MapJoinTestDescription.SmallTableGenerationParameters 
smallTableGenerationParameters =
+        new MapJoinTestDescription.SmallTableGenerationParameters();
+    smallTableGenerationParameters
+        
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+    setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation, 
mapJoinImplementation, bigTableColumnNames,
+        bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos, 
smallTableRetainKeyColumnNums,
+        smallTableGenerationParameters);
+    this.customKeyValueReader = generateLongKVPairs(rowCount, seed);
+  }
+
+  private static CustomKeyValueReader generateLongKVPairs(int rows, long seed) 
throws IOException {
+    LOG.info("Data GEN for: " + rows);
+    Random random = new Random(seed);
+    BytesWritable[] keys = new BytesWritable[rows];
+    BytesWritable[] values = new BytesWritable[rows];
+    BinarySortableSerializeWrite bsw = new BinarySortableSerializeWrite(1);
+    long startTime = System.currentTimeMillis();
+    ByteStream.Output outp;
+    BytesWritable key;
+    BytesWritable value;
+    for (int i = 0; i < rows; i++) {
+      outp = new ByteStream.Output();
+      bsw.set(outp);
+      long k = random.nextInt(rows);
+      bsw.writeLong(k);
+      key = new BytesWritable(outp.getData(), outp.getLength());
+      outp = new ByteStream.Output();
+      bsw.reset();
+      bsw.writeLong(random.nextInt(rows * 2));
+      value = new BytesWritable(outp.getData(), outp.getLength());
+      keys[i] = key;
+      values[i] = value;
+    }
+    LOG.info("Data GEN done after {} sec",
+        TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
startTime));
+    return new CustomKeyValueReader(keys, values);
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
new file mode 100644
index 0000000..be6eae7
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.RandomTypeUtil;
+import 
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class MultiKeyBase extends AbstractHTLoadBench {
+
+  public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation 
vectorMapJoinVariation,
+      MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int 
rows) throws Exception {
+    long seed = 2543;
+    int rowCount = rows;
+    HiveConf hiveConf = new HiveConf();
+    int[] bigTableKeyColumnNums = new int[] { 0, 1, 2};
+    String[] bigTableColumnNames = new String[] { "b1", "b2", "b3" };
+    TypeInfo[] bigTableTypeInfos = new TypeInfo[] {
+        TypeInfoFactory.intTypeInfo,
+        TypeInfoFactory.longTypeInfo,
+        TypeInfoFactory.stringTypeInfo
+    };
+    int[] smallTableRetainKeyColumnNums = new int[] {};
+    TypeInfo[] smallTableValueTypeInfos = new TypeInfo[] { 
TypeInfoFactory.stringTypeInfo };
+    MapJoinTestDescription.SmallTableGenerationParameters 
smallTableGenerationParameters =
+        new MapJoinTestDescription.SmallTableGenerationParameters();
+    smallTableGenerationParameters
+        
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+    setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation, 
mapJoinImplementation, bigTableColumnNames,
+        bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos, 
smallTableRetainKeyColumnNums,
+        smallTableGenerationParameters);
+    this.customKeyValueReader = generateByteKVPairs(rowCount, seed);
+  }
+
+  private static CustomKeyValueReader generateByteKVPairs(int rows, long seed) 
throws IOException {
+    LOG.info("Data GEN for: " + rows);
+    Random random = new Random(seed);
+    BytesWritable[] keys = new BytesWritable[rows];
+    BytesWritable[] values = new BytesWritable[rows];
+    for (int i = 0; i < rows; i++) {
+      keys[i] = new BytesWritable();
+      values[i] = new BytesWritable();
+    }
+    BinarySortableSerializeWrite serializeWrite = new 
BinarySortableSerializeWrite(1);
+    ByteStream.Output output = new ByteStream.Output();
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < rows; i++) {
+      output.reset();
+      serializeWrite.set(output);
+      VerifyFastRow.serializeWrite(serializeWrite, 
TypeInfoFactory.timestampTypeInfo, new 
TimestampWritableV2(RandomTypeUtil.getRandTimestamp(random)));
+      keys[i].set(output.getData(), 0, output.getLength());
+
+      output.reset();
+      Text v = new Text(RandomTypeUtil.getRandString(random, null, 20*2));
+      serializeWrite.set(output);
+      VerifyFastRow.serializeWrite(serializeWrite, 
TypeInfoFactory.stringTypeInfo, v);
+      values[i].set(output.getData(), 0, output.getLength());
+    }
+    LOG.info("Data GEN done after {} sec",
+        TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
startTime));
+    return new CustomKeyValueReader(keys, values);
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
new file mode 100644
index 0000000..9856bb3
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ *     java -jar -Xmx8g -Xms8g target/benchmarks.jar 
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTBytesKeyBench
+ *
+ *  {HASH_MAP, HASH_SET, HASH_MULTISET}
+ *    X
+ *  {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTBytesKeyBench {
+
+  public static class FastHashVectorBench extends BytesKeyBase {
+    @Setup
+    public void setup() throws Exception {
+      LOG.info("Do Setup");
+      doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE), 
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+    }
+
+    @TearDown(Level.Invocation)
+    public void doTearDown() {
+      LOG.info("Do TearDown");
+      customKeyValueReader.reset();
+    }
+  }
+
+  /**
+   * For output log check: itests/target/tmp/log/hive-jmh.log
+   */
+  public static void main(String[] args) throws RunnerException {
+    Options opt = new OptionsBuilder()
+        .include(".*" + VectorFastHTBytesKeyBench.class.getSimpleName() + ".*")
+        .warmupIterations(4) // number of times the warmup iteration should 
take place
+        .measurementIterations(4) //number of times the actual iteration 
should take place
+        .forks(1)
+        .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+        .build();
+    new Runner(opt).run();
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
new file mode 100644
index 0000000..ad1c2c7
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf -Drat.skip=true" at main 
itests directory.
+ * From itests/hive-jmh directory, run:
+ *     java -jar -Xmx8g -Xms8g target/benchmarks.jar 
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTLongKeyBench
+ *
+ *  {HASH_MAP, HASH_SET, HASH_MULTISET}
+ *    X
+ *  {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTLongKeyBench {
+
+  public static class FastHashVectorBench extends LongKeyBase {
+    @Setup
+    public void setup() throws Exception {
+      LOG.info("Do Setup");
+      doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE), 
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+    }
+
+    @TearDown(Level.Invocation)
+    public void doTearDown() {
+      LOG.info("Do TearDown");
+      customKeyValueReader.reset();
+    }
+  }
+
+  /**
+   * For output log check: itests/target/tmp/log/hive-jmh.log
+   */
+  public static void main(String[] args) throws RunnerException {
+    Options opt = new OptionsBuilder()
+        .include(".*" + VectorFastHTLongKeyBench.class.getSimpleName() + ".*")
+        .warmupIterations(4) // number of times the warmup iteration should 
take place
+        .measurementIterations(4) //number of times the actual iteration 
should take place
+        .forks(1)
+        .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+        .build();
+    new Runner(opt).run();
+  }
+}
diff --git 
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
new file mode 100644
index 0000000..a36f15c
--- /dev/null
+++ 
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ *     java -jar -Xmx8g -Xms8g target/benchmarks.jar 
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTMultiKeyBench
+ *
+ *  {HASH_MAP, HASH_SET, HASH_MULTISET}
+ *    X
+ *  {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTMultiKeyBench {
+
+  public static class FastHashVectorBench extends MultiKeyBase {
+    @Setup
+    public void setup() throws Exception {
+      LOG.info("Do Setup");
+      doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE), 
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+    }
+
+    @TearDown(Level.Invocation)
+    public void doTearDown() {
+      LOG.info("Do TearDown");
+      customKeyValueReader.reset();
+    }
+  }
+
+  /**
+   * For output log check: itests/target/tmp/log/hive-jmh.log
+   */
+  public static void main(String[] args) throws RunnerException {
+    Options opt = new OptionsBuilder()
+        .include(".*" + VectorFastHTMultiKeyBench.class.getSimpleName() + ".*")
+        .warmupIterations(4) // number of times the warmup iteration should 
take place
+        .measurementIterations(4) //number of times the actual iteration 
should take place
+        .forks(1)
+        .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+        .build();
+    new Runner(opt).run();
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 38eccb1..0aa4736 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -78,6 +78,19 @@ public class VectorMapJoinFastHashTableLoader implements 
org.apache.hadoop.hive.
   private BlockingQueue<HashTableElementBatch>[] loadBatchQueues;
   private static final HashTableElementBatch DONE_SENTINEL = new 
HashTableElementBatch();
 
+  public VectorMapJoinFastHashTableLoader() {
+
+  }
+
+  public VectorMapJoinFastHashTableLoader(TezContext context, Configuration 
hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = 
this.tezContext.getTezProcessorContext().getCounters().findCounter(
+        HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), 
hconf.get(Operator.CONTEXT_NAME_KEY, ""));
+  }
+
   @Override
   public void init(ExecMapperContext context, MapredContext mrContext,
       Configuration hconf, MapJoinOperator joinOp) {

Reply via email to