pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631157229



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 
numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} 
tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), 
pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes 
here.
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new 
BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }

Review comment:
       ets use a init method for these lines

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 
numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} 
tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), 
pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes 
here.
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new 
BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }

Review comment:
       Lets use a init method for these lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to