This is an automated email from the ASF dual-hosted git repository.
pgaref pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b172859 HIVE-25146: JMH tests for Multi HT and parallel load (#3044)
(Panagiotis Garefalakis, reviewed by Ramesh Kumar)
b172859 is described below
commit b17285975b881265f9773bc43b1fbdbebecef364
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Mon Mar 14 11:11:53 2022 -0700
HIVE-25146: JMH tests for Multi HT and parallel load (#3044) (Panagiotis
Garefalakis, reviewed by Ramesh Kumar)
* JMH tests for parallel HT load, configuration parameters include
LOAD_THREADS_NUM, ROWS_NUM and JOIN_TYPE. A single thread simulates the default
load behaviour while ROWS_NUM < 1M will default to a single thread for
simplicity. Higher number of threads >=2 evaluates the benefit of parallel
loading for the HT.
Change-Id: I1e0f07bf93ae57ff54022dc145fc9a7defd44782
---
itests/hive-jmh/pom.xml | 22 +++
.../mapjoin/load/AbstractHTLoadBench.java | 174 +++++++++++++++++++
.../vectorization/mapjoin/load/BytesKeyBase.java | 132 ++++++++++++++
.../LegacyVectorMapJoinFastHashTableLoader.java | 189 +++++++++++++++++++++
.../vectorization/mapjoin/load/LongKeyBase.java | 80 +++++++++
.../vectorization/mapjoin/load/MultiKeyBase.java | 89 ++++++++++
.../mapjoin/load/VectorFastHTBytesKeyBench.java | 68 ++++++++
.../mapjoin/load/VectorFastHTLongKeyBench.java | 68 ++++++++
.../mapjoin/load/VectorFastHTMultiKeyBench.java | 68 ++++++++
.../fast/VectorMapJoinFastHashTableLoader.java | 13 ++
10 files changed, 903 insertions(+)
diff --git a/itests/hive-jmh/pom.xml b/itests/hive-jmh/pom.xml
index de083df..6a2121b 100644
--- a/itests/hive-jmh/pom.xml
+++ b/itests/hive-jmh/pom.xml
@@ -67,6 +67,10 @@
<classifier>tests</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
@@ -74,6 +78,24 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
new file mode 100644
index 0000000..45a8706
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/AbstractHTLoadBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.HashTableLoader;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.CreateMapJoinResult;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestData;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.MapJoinPlanVariation;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.SmallTableGenerationParameters;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+
+@BenchmarkMode(Mode.AverageTime)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public abstract class AbstractHTLoadBench {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractHTLoadBench.class.getName());
+
+ protected VectorMapJoinVariation vectorMapJoinVariation;
+ protected MapJoinTestImplementation mapJoinImplementation;
+ protected CreateMapJoinResult createMapJoinResult;
+
+ protected int rowCount;
+ protected MapJoinTestData testData;
+ protected TezContext mockTezContext;
+ protected MapJoinTestDescription testDesc;
+ protected CustomKeyValueReader customKeyValueReader;
+
+
+ @Param({"0", "1", "2", "4", "8", "16", "32", "64"})
+ static int LOAD_THREADS_NUM;
+
+ @Param({"10000", "2000000"})
+ static int ROWS_NUM;
+
+ // MultiSet, HashSet, HashMap
+ @Param({"INNER_BIG_ONLY", "LEFT_SEMI", "INNER"})
+ static String JOIN_TYPE;
+
+ @Benchmark
+ public void hashTableLoadBench() throws Exception {
+ /* Mocking TezContext */
+ this.mockTezContext = PowerMockito.mock(TezContext.class);
+ ProcessorContext mockProcessorContest =
PowerMockito.mock(ProcessorContext.class);
+ AbstractLogicalInput mockLogicalInput =
PowerMockito.mock(AbstractLogicalInput.class);
+ TezCounters mockTezCounters = PowerMockito.mock(TezCounters.class);
+ TezCounter mockTezCounter = PowerMockito.mock(TezCounter.class);
+ InputContext mockInputContext = PowerMockito.mock(InputContext.class);
+ // Make sure the KEY estimation is correct to have properly sized HT
+ PowerMockito.when(mockTezCounter.getValue()).thenReturn((long)rowCount);
+
PowerMockito.when(mockTezContext.getInput(any())).thenReturn(mockLogicalInput);
+
PowerMockito.when(mockLogicalInput.getContext()).thenReturn(mockInputContext);
+
PowerMockito.when(mockInputContext.getCounters()).thenReturn(mockTezCounters);
+ PowerMockito.when(mockTezCounters.findCounter(anyString(),
anyString())).thenReturn(mockTezCounter);
+
PowerMockito.when(mockTezContext.getTezProcessorContext()).thenReturn(mockProcessorContest);
+
PowerMockito.when(mockTezContext.getTezProcessorContext().getCounters()).thenReturn(mockTezCounters);
+
PowerMockito.when(mockTezContext.getTezProcessorContext().getCounters().findCounter(anyString(),
anyString())).
+ thenReturn(mockTezCounter);
+ // Replace streaming Tez-Input with our custom Iterator
+
PowerMockito.when(mockTezContext.getInput(any())).thenReturn(mockLogicalInput);
+
PowerMockito.when(mockLogicalInput.getReader()).thenReturn(customKeyValueReader);
+ /* Mocking Done*/
+ HashTableLoader ht = LOAD_THREADS_NUM == 0 ?
+ new LegacyVectorMapJoinFastHashTableLoader(mockTezContext,
testDesc.hiveConf, createMapJoinResult.mapJoinOperator) :
+ new VectorMapJoinFastHashTableLoader(mockTezContext,
testDesc.hiveConf, createMapJoinResult.mapJoinOperator);
+ ht.load(new
MapJoinTableContainer[createMapJoinResult.mapJoinOperator.getConf().getTagLength()],
null);
+ }
+
+ // Common Setup
+ protected void setupMapJoinHT(HiveConf hiveConf, long seed, int rowCount,
+ VectorMapJoinVariation vectorMapJoinVariation, MapJoinTestImplementation
mapJoinImplementation,
+ String[] bigTableColumnNames, TypeInfo[] bigTableTypeInfos, int[]
bigTableKeyColumnNums,
+ TypeInfo[] smallTableValueTypeInfos, int[] smallTableRetainKeyColumnNums,
+ SmallTableGenerationParameters smallTableGenerationParameters) throws
Exception {
+
+ hiveConf.set(HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS.varname,
LOAD_THREADS_NUM + "");
+ LOG.info("Number of threads: " +
hiveConf.get(HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS.varname));
+
+
+ this.rowCount = rowCount;
+
+ this.vectorMapJoinVariation = vectorMapJoinVariation;
+ this.mapJoinImplementation = mapJoinImplementation;
+ this.testDesc = new MapJoinTestDescription(hiveConf,
vectorMapJoinVariation, bigTableColumnNames, bigTableTypeInfos,
+ bigTableKeyColumnNums, smallTableValueTypeInfos,
smallTableRetainKeyColumnNums, smallTableGenerationParameters,
+ MapJoinPlanVariation.DYNAMIC_PARTITION_HASH_JOIN);
+ this.testData = new MapJoinTestData(1, testDesc, seed);
+
+ MapJoinDesc mapJoinDesc = MapJoinTestConfig.createMapJoinDesc(testDesc);
+ this.createMapJoinResult =
MapJoinTestConfig.createMapJoinImplementation(mapJoinImplementation, testDesc,
testData,
+ mapJoinDesc,/* shareMapJoinTableContainer */ null);
+ }
+
+ static class CustomKeyValueReader extends KeyValueReader {
+ private BytesWritable[] keys;
+ private BytesWritable[] values;
+ private int idx = -1;
+
+ public CustomKeyValueReader(BytesWritable[] k, BytesWritable[] v) {
+ this.keys = k;
+ this.values = v;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (++idx >= keys.length)
+ return false;
+ return true;
+ }
+
+ @Override
+ public Object getCurrentKey() throws IOException {
+ return keys[idx];
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException {
+ return values[idx];
+ }
+
+ void reset() {
+ this.idx = -1;
+ }
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
new file mode 100644
index 0000000..a4ddd9d
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/BytesKeyBase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.RandomTypeUtil;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class BytesKeyBase extends AbstractHTLoadBench {
+
+ public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation
vectorMapJoinVariation,
+ MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int
rows) throws Exception {
+ long seed = 2543;
+ int rowCount = rows;
+ HiveConf hiveConf = new HiveConf();
+ int[] bigTableKeyColumnNums = new int[] { 0 };
+ String[] bigTableColumnNames = new String[] { "b1" };
+ TypeInfo[] bigTableTypeInfos = new TypeInfo[] {
TypeInfoFactory.stringTypeInfo };
+ int[] smallTableRetainKeyColumnNums = new int[] {};
+ TypeInfo[] smallTableValueTypeInfos =
+ new TypeInfo[] { TypeInfoFactory.dateTypeInfo,
TypeInfoFactory.timestampTypeInfo };
+ MapJoinTestDescription.SmallTableGenerationParameters
smallTableGenerationParameters =
+ new MapJoinTestDescription.SmallTableGenerationParameters();
+ smallTableGenerationParameters
+
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+ setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation,
mapJoinImplementation, bigTableColumnNames,
+ bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos,
smallTableRetainKeyColumnNums,
+ smallTableGenerationParameters);
+ this.customKeyValueReader = generateByteKVPairs(rowCount, seed);
+ }
+
+ private static CustomKeyValueReader generateByteKVPairs(int rows, long seed)
throws IOException {
+ LOG.info("Data GEN for: " + rows);
+ Random random = new Random(seed);
+ BytesWritable[] keys = new BytesWritable[rows];
+ BytesWritable[] values = new BytesWritable[rows];
+ for (int i = 0; i < rows; i++) {
+ keys[i] = new BytesWritable();
+ values[i] = new BytesWritable();
+ }
+ long startTime = System.currentTimeMillis();
+
+ BinarySortableSerializeWrite serializeWrite = new
BinarySortableSerializeWrite(1);
+ ByteStream.Output output = new ByteStream.Output();
+ int str_len = 20;
+
+ for (int i = 0; i < rows; i++) {
+ output.reset();
+ Text k = new Text(RandomTypeUtil.getRandString(random, null, str_len));
+ serializeWrite.set(output);
+ VerifyFastRow.serializeWrite(serializeWrite,
TypeInfoFactory.stringTypeInfo, k);
+ keys[i].set(output.getData(), 0, output.getLength());
+
+ output.reset();
+ Text v = new Text(RandomTypeUtil.getRandString(random, null, str_len*2));
+ serializeWrite.set(output);
+ VerifyFastRow.serializeWrite(serializeWrite,
TypeInfoFactory.stringTypeInfo, v);
+ values[i].set(output.getData(), 0, output.getLength());
+ }
+ LOG.info("Data GEN done after {} sec",
+ TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
startTime));
+ return new CustomKeyValueReader(keys, values);
+ }
+
+ public static String deserializeBinary(BytesWritable currentKey) throws
HiveException {
+ PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo
};
+ boolean[] columnSortOrderIsDesc = new boolean[1];
+ Arrays.fill(columnSortOrderIsDesc, false);
+
+ byte[] columnNullMarker = new byte[1];
+ Arrays.fill(columnNullMarker, BinarySortableSerDe.ZERO);
+ byte[] columnNotNullMarker = new byte[1];
+ Arrays.fill(columnNotNullMarker, BinarySortableSerDe.ONE);
+
+ BinarySortableDeserializeRead keyBinarySortableDeserializeRead =
+ new BinarySortableDeserializeRead(
+ primitiveTypeInfos,
+ /* useExternalBuffer */ false,
+ columnSortOrderIsDesc,
+ columnNullMarker,
+ columnNotNullMarker);
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return null;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+
+
+ byte[] stringBytes = Arrays.copyOfRange(
+ keyBinarySortableDeserializeRead.currentBytes,
+ keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesStart +
keyBinarySortableDeserializeRead.currentBytesLength);
+ Text text = new Text(stringBytes);
+ String string = text.toString();
+ return string;
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
new file mode 100644
index 0000000..7af9380
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements
org.apache.hadoop.hive.ql.exec.HashTableLoader {
+ private static final Logger LOG =
LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class);
+ private Configuration hconf;
+ protected MapJoinDesc desc;
+ private TezContext tezContext;
+ private String cacheKey;
+ private TezCounter htLoadCounter;
+
+ public LegacyVectorMapJoinFastHashTableLoader(TezContext context,
Configuration hconf, MapJoinOperator joinOp) {
+ this.tezContext = context;
+ this.hconf = hconf;
+ this.desc = (MapJoinDesc)joinOp.getConf();
+ this.cacheKey = joinOp.getCacheKey();
+ this.htLoadCounter =
this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf,
HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+ }
+
+ @Override
+ public void init(ExecMapperContext context, MapredContext mrContext,
+ Configuration hconf, MapJoinOperator joinOp) {
+ this.tezContext = (TezContext) mrContext;
+ this.hconf = hconf;
+ this.desc = joinOp.getConf();
+ this.cacheKey = joinOp.getCacheKey();
+ String counterGroup = HiveConf.getVar(hconf,
HiveConf.ConfVars.HIVECOUNTERGROUP);
+ String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ String counterName =
Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(),
vertexName);
+ this.htLoadCounter =
tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup,
counterName);
+ }
+
+ @Override
+ public void load(MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+ throws HiveException {
+
+ Map<Integer, String> parentToInput = desc.getParentToInput();
+ Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+ MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+ boolean doMemCheck = false;
+ long effectiveThreshold = 0;
+ if (memoryMonitorInfo != null) {
+ effectiveThreshold =
memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+ // hash table loading happens in server side, LlapDecider could kick out
some fragments to run outside of LLAP.
+ // Flip the flag at runtime in case if we are running outside of LLAP
+ if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+ memoryMonitorInfo.setLlap(false);
+ }
+ if (memoryMonitorInfo.doMemoryMonitoring()) {
+ doMemCheck = true;
+ LOG.info("Memory monitoring for hash table loader enabled. {}",
memoryMonitorInfo);
+ }
+ }
+
+ if (!doMemCheck) {
+ LOG.info("Not doing hash table memory monitoring. {}",
memoryMonitorInfo);
+ }
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == desc.getPosBigTable()) {
+ continue;
+ }
+
+ long numEntries = 0;
+ String inputName = parentToInput.get(pos);
+ LogicalInput input = tezContext.getInput(inputName);
+
+ try {
+ input.start();
+ tezContext.getTezProcessorContext().waitForAnyInputReady(
+ Collections.<Input> singletonList(input));
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+
+ try {
+ KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+ Long keyCountObj = parentKeyCounts.get(pos);
+ long estKeyCount = (keyCountObj == null) ? -1 : keyCountObj;
+
+ long inputRecords = -1;
+ try {
+ //TODO : Need to use class instead of string.
+ // https://issues.apache.org/jira/browse/HIVE-23981
+ inputRecords = ((AbstractLogicalInput)
input).getContext().getCounters().
+ findCounter("org.apache.tez.common.counters.TaskCounter",
+ "APPROXIMATE_INPUT_RECORDS").getValue();
+ } catch (Exception e) {
+ LOG.debug("Failed to get value for counter
APPROXIMATE_INPUT_RECORDS", e);
+ }
+ long keyCount = Math.max(estKeyCount, inputRecords);
+
+ VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
+ new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
+
+ LOG.info("Loading hash table for input: {} cacheKey: {}
tableContainer: {} smallTablePos: {} " +
+ "estKeyCount : {} keyCount : {}", inputName, cacheKey,
+ vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos,
estKeyCount, keyCount);
+
+ vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes
here.
+ long startTime = System.currentTimeMillis();
+ while (kvReader.next()) {
+
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
+ (BytesWritable)kvReader.getCurrentValue());
+ numEntries++;
+ if (doMemCheck && (numEntries %
memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+ final long estMemUsage =
vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+ if (estMemUsage > effectiveThreshold) {
+ String msg = "Hash table loading exceeded memory limits for
input: " + inputName +
+ " numEntries: " + numEntries + " estimatedMemoryUsage: " +
estMemUsage +
+ " effectiveThreshold: " + effectiveThreshold + "
memoryMonitorInfo: " + memoryMonitorInfo;
+ LOG.error(msg);
+ throw new MapJoinMemoryExhaustionError(msg);
+ } else {
+ LOG.info(
+ "Checking hash table loader memory usage for input: {}
numEntries: {} "
+ + "estimatedMemoryUsage: {} effectiveThreshold: {}",
+ inputName, numEntries, estMemUsage, effectiveThreshold);
+ }
+ }
+ }
+ long delta = System.currentTimeMillis() - startTime;
+ htLoadCounter.increment(delta);
+
+ vectorMapJoinFastTableContainer.seal();
+ mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+ if (doMemCheck) {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {}
numEntries: {} " +
+ "estimatedMemoryUsage: {} Load Time : {} ", inputName,
cacheKey, numEntries,
+ vectorMapJoinFastTableContainer.getEstimatedMemorySize(), delta);
+ } else {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {}
numEntries: {} Load Time : {} ",
+ inputName, cacheKey, numEntries, delta);
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
new file mode 100644
index 0000000..7bbaa9c
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LongKeyBase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class LongKeyBase extends AbstractHTLoadBench {
+
+ public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation
vectorMapJoinVariation,
+ MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int
rows) throws Exception {
+ long seed = 2543;
+ int rowCount = rows;
+ HiveConf hiveConf = new HiveConf();
+ int[] bigTableKeyColumnNums = new int[] { 0 };
+ String[] bigTableColumnNames = new String[] { "number1" };
+ TypeInfo[] bigTableTypeInfos = new TypeInfo[] {
TypeInfoFactory.longTypeInfo };
+ int[] smallTableRetainKeyColumnNums = new int[] {};
+ TypeInfo[] smallTableValueTypeInfos =
+ new TypeInfo[] { TypeInfoFactory.dateTypeInfo,
TypeInfoFactory.stringTypeInfo };
+ MapJoinTestDescription.SmallTableGenerationParameters
smallTableGenerationParameters =
+ new MapJoinTestDescription.SmallTableGenerationParameters();
+ smallTableGenerationParameters
+
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+ setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation,
mapJoinImplementation, bigTableColumnNames,
+ bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos,
smallTableRetainKeyColumnNums,
+ smallTableGenerationParameters);
+ this.customKeyValueReader = generateLongKVPairs(rowCount, seed);
+ }
+
+ private static CustomKeyValueReader generateLongKVPairs(int rows, long seed)
throws IOException {
+ LOG.info("Data GEN for: " + rows);
+ Random random = new Random(seed);
+ BytesWritable[] keys = new BytesWritable[rows];
+ BytesWritable[] values = new BytesWritable[rows];
+ BinarySortableSerializeWrite bsw = new BinarySortableSerializeWrite(1);
+ long startTime = System.currentTimeMillis();
+ ByteStream.Output outp;
+ BytesWritable key;
+ BytesWritable value;
+ for (int i = 0; i < rows; i++) {
+ outp = new ByteStream.Output();
+ bsw.set(outp);
+ long k = random.nextInt(rows);
+ bsw.writeLong(k);
+ key = new BytesWritable(outp.getData(), outp.getLength());
+ outp = new ByteStream.Output();
+ bsw.reset();
+ bsw.writeLong(random.nextInt(rows * 2));
+ value = new BytesWritable(outp.getData(), outp.getLength());
+ keys[i] = key;
+ values[i] = value;
+ }
+ LOG.info("Data GEN done after {} sec",
+ TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
startTime));
+ return new CustomKeyValueReader(keys, values);
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
new file mode 100644
index 0000000..be6eae7
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.RandomTypeUtil;
+import
org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class MultiKeyBase extends AbstractHTLoadBench {
+
+ public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation
vectorMapJoinVariation,
+ MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int
rows) throws Exception {
+ long seed = 2543;
+ int rowCount = rows;
+ HiveConf hiveConf = new HiveConf();
+ int[] bigTableKeyColumnNums = new int[] { 0, 1, 2};
+ String[] bigTableColumnNames = new String[] { "b1", "b2", "b3" };
+ TypeInfo[] bigTableTypeInfos = new TypeInfo[] {
+ TypeInfoFactory.intTypeInfo,
+ TypeInfoFactory.longTypeInfo,
+ TypeInfoFactory.stringTypeInfo
+ };
+ int[] smallTableRetainKeyColumnNums = new int[] {};
+ TypeInfo[] smallTableValueTypeInfos = new TypeInfo[] {
TypeInfoFactory.stringTypeInfo };
+ MapJoinTestDescription.SmallTableGenerationParameters
smallTableGenerationParameters =
+ new MapJoinTestDescription.SmallTableGenerationParameters();
+ smallTableGenerationParameters
+
.setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+ setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation,
mapJoinImplementation, bigTableColumnNames,
+ bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos,
smallTableRetainKeyColumnNums,
+ smallTableGenerationParameters);
+ this.customKeyValueReader = generateByteKVPairs(rowCount, seed);
+ }
+
+ private static CustomKeyValueReader generateByteKVPairs(int rows, long seed)
throws IOException {
+ LOG.info("Data GEN for: " + rows);
+ Random random = new Random(seed);
+ BytesWritable[] keys = new BytesWritable[rows];
+ BytesWritable[] values = new BytesWritable[rows];
+ for (int i = 0; i < rows; i++) {
+ keys[i] = new BytesWritable();
+ values[i] = new BytesWritable();
+ }
+ BinarySortableSerializeWrite serializeWrite = new
BinarySortableSerializeWrite(1);
+ ByteStream.Output output = new ByteStream.Output();
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < rows; i++) {
+ output.reset();
+ serializeWrite.set(output);
+ VerifyFastRow.serializeWrite(serializeWrite,
TypeInfoFactory.timestampTypeInfo, new
TimestampWritableV2(RandomTypeUtil.getRandTimestamp(random)));
+ keys[i].set(output.getData(), 0, output.getLength());
+
+ output.reset();
+ Text v = new Text(RandomTypeUtil.getRandString(random, null, 20*2));
+ serializeWrite.set(output);
+ VerifyFastRow.serializeWrite(serializeWrite,
TypeInfoFactory.stringTypeInfo, v);
+ values[i].set(output.getData(), 0, output.getLength());
+ }
+ LOG.info("Data GEN done after {} sec",
+ TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
startTime));
+ return new CustomKeyValueReader(keys, values);
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
new file mode 100644
index 0000000..9856bb3
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ * java -jar -Xmx8g -Xms8g target/benchmarks.jar
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTBytesKeyBench
+ *
+ * {HASH_MAP, HASH_SET, HASH_MULTISET}
+ * X
+ * {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTBytesKeyBench {
+
+ public static class FastHashVectorBench extends BytesKeyBase {
+ @Setup
+ public void setup() throws Exception {
+ LOG.info("Do Setup");
+ doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE),
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+ }
+
+ @TearDown(Level.Invocation)
+ public void doTearDown() {
+ LOG.info("Do TearDown");
+ customKeyValueReader.reset();
+ }
+ }
+
+ /**
+ * For output log check: itests/target/tmp/log/hive-jmh.log
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + VectorFastHTBytesKeyBench.class.getSimpleName() + ".*")
+ .warmupIterations(4) // number of times the warmup iteration should
take place
+ .measurementIterations(4) //number of times the actual iteration
should take place
+ .forks(1)
+ .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+ .build();
+ new Runner(opt).run();
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
new file mode 100644
index 0000000..ad1c2c7
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf -Drat.skip=true" at main
itests directory.
+ * From itests/hive-jmh directory, run:
+ * java -jar -Xmx8g -Xms8g target/benchmarks.jar
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTLongKeyBench
+ *
+ * {HASH_MAP, HASH_SET, HASH_MULTISET}
+ * X
+ * {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTLongKeyBench {
+
+ public static class FastHashVectorBench extends LongKeyBase {
+ @Setup
+ public void setup() throws Exception {
+ LOG.info("Do Setup");
+ doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE),
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+ }
+
+ @TearDown(Level.Invocation)
+ public void doTearDown() {
+ LOG.info("Do TearDown");
+ customKeyValueReader.reset();
+ }
+ }
+
+ /**
+ * For output log check: itests/target/tmp/log/hive-jmh.log
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + VectorFastHTLongKeyBench.class.getSimpleName() + ".*")
+ .warmupIterations(4) // number of times the warmup iteration should
take place
+ .measurementIterations(4) //number of times the actual iteration
should take place
+ .forks(1)
+ .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+ .build();
+ new Runner(opt).run();
+ }
+}
diff --git
a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
new file mode 100644
index 0000000..a36f15c
--- /dev/null
+++
b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTMultiKeyBench.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import
org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ * java -jar -Xmx8g -Xms8g target/benchmarks.jar
org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTMultiKeyBench
+ *
+ * {HASH_MAP, HASH_SET, HASH_MULTISET}
+ * X
+ * {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTMultiKeyBench {
+
+ public static class FastHashVectorBench extends MultiKeyBase {
+ @Setup
+ public void setup() throws Exception {
+ LOG.info("Do Setup");
+ doSetup(VectorMapJoinVariation.valueOf(JOIN_TYPE),
MapJoinTestImplementation.NATIVE_VECTOR_FAST, ROWS_NUM);
+ }
+
+ @TearDown(Level.Invocation)
+ public void doTearDown() {
+ LOG.info("Do TearDown");
+ customKeyValueReader.reset();
+ }
+ }
+
+ /**
+ * For output log check: itests/target/tmp/log/hive-jmh.log
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + VectorFastHTMultiKeyBench.class.getSimpleName() + ".*")
+ .warmupIterations(4) // number of times the warmup iteration should
take place
+ .measurementIterations(4) //number of times the actual iteration
should take place
+ .forks(1)
+ .jvmArgs("-Xmx8g", "-Xms8g", "-XX:MaxDirectMemorySize=512M")
+ .build();
+ new Runner(opt).run();
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 38eccb1..0aa4736 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -78,6 +78,19 @@ public class VectorMapJoinFastHashTableLoader implements
org.apache.hadoop.hive.
private BlockingQueue<HashTableElementBatch>[] loadBatchQueues;
private static final HashTableElementBatch DONE_SENTINEL = new
HashTableElementBatch();
+ public VectorMapJoinFastHashTableLoader() {
+
+ }
+
+ public VectorMapJoinFastHashTableLoader(TezContext context, Configuration
hconf, MapJoinOperator joinOp) {
+ this.tezContext = context;
+ this.hconf = hconf;
+ this.desc = joinOp.getConf();
+ this.cacheKey = joinOp.getCacheKey();
+ this.htLoadCounter =
this.tezContext.getTezProcessorContext().getCounters().findCounter(
+ HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
hconf.get(Operator.CONTEXT_NAME_KEY, ""));
+ }
+
@Override
public void init(ExecMapperContext context, MapredContext mrContext,
Configuration hconf, MapJoinOperator joinOp) {