deniskuzZ commented on code in PR #5845: URL: https://github.com/apache/hive/pull/5845#discussion_r2132879484
########## ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java: ########## @@ -148,19 +170,29 @@ public void resetBeforeOffer(HashTableElementBatch elementBatch) { } } - private void submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer) + @VisibleForTesting + protected List<Future<?>> submitQueueDrainThreadsForTest(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer) + throws IOException, InterruptedException, SerDeException { + return submitQueueDrainThreads(vectorMapJoinFastTableContainer); + } + + private List<Future<?>> submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer) throws InterruptedException, IOException, SerDeException { + List<Future<?>> futures = new ArrayList<>(); for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) { int finalPartitionId = partitionId; - this.loadExecService.submit(() -> { + Future<?> future = this.loadExecService.submit(() -> { Review Comment: please use `CompletableFuture` instead ```` List<CompletableFuture<Void>> loaderTasks = new ArrayList<>(); CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(() -> { try { LOG.info("Partition id {} with Queue size {}", finalPartitionId, loadBatchQueues[finalPartitionId].size()); drainAndLoadForPartition(finalPartitionId, vectorMapJoinFastTableContainer); } catch (IOException | InterruptedException | SerDeException | HiveException e) { throw new RuntimeException("Failed to start HT Load threads", e); } }, loadExecService); ..................... loaderTasks.add(asyncTask); try { if (loaderTasks != null) { CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0])).get(2, TimeUnit.MINUTES); } } catch (ExecutionException e) { throw new HiveException("One of the loader threads failed", e.getCause()); } catch (TimeoutException e) { throw new HiveException("Failed to complete the hash table loader. Loading timed out."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (loadExecService != null && !loadExecService.isTerminated()) { loadExecService.shutdownNow(); } } ```` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org