deniskuzZ commented on code in PR #5845:
URL: https://github.com/apache/hive/pull/5845#discussion_r2132879484


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java:
##########
@@ -148,19 +170,29 @@ public void resetBeforeOffer(HashTableElementBatch 
elementBatch) {
     }
   }
 
-  private void submitQueueDrainThreads(VectorMapJoinFastTableContainer 
vectorMapJoinFastTableContainer)
+  @VisibleForTesting
+  protected List<Future<?>> 
submitQueueDrainThreadsForTest(VectorMapJoinFastTableContainer 
vectorMapJoinFastTableContainer)
+          throws IOException, InterruptedException, SerDeException {
+    return submitQueueDrainThreads(vectorMapJoinFastTableContainer);
+  }
+
+  private List<Future<?>> 
submitQueueDrainThreads(VectorMapJoinFastTableContainer 
vectorMapJoinFastTableContainer)
       throws InterruptedException, IOException, SerDeException {
+    List<Future<?>> futures = new ArrayList<>();
     for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) {
       int finalPartitionId = partitionId;
-      this.loadExecService.submit(() -> {
+      Future<?> future = this.loadExecService.submit(() -> {

Review Comment:
   please use `CompletableFuture` instead
   ````
   List<CompletableFuture<Void>> loaderTasks = new ArrayList<>();
   ...................
   
   CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(() -> {
     try {
       LOG.info("Partition id {} with Queue size {}", finalPartitionId, 
loadBatchQueues[finalPartitionId].size());
       drainAndLoadForPartition(finalPartitionId, 
vectorMapJoinFastTableContainer);
     } catch (IOException | InterruptedException | SerDeException | 
HiveException e) {
       throw new RuntimeException("Failed to start HT Load thread", e);
     }
   }, loadExecService);
   
   .....................
   
   loaderTasks.add(asyncTask);
   
   try {
     CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0]))
       .get(2, TimeUnit.MINUTES);
   } catch (TimeoutException e) {
     throw new HiveException("Failed to complete the hash table loader. Loading 
timed out.");
   } catch (ExecutionException e) {
     throw new HiveException("One of the loader threads failed", e.getCause());
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
   } finally {
     if (loadExecService != null && !loadExecService.isTerminated()) {
       loadExecService.shutdownNow();
     }
   }
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to