This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 25.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/25.0.0 by this push:
     new 2effa54459 Remove stray reference to fix OOM while merging sketches 
(#13475) (#13529)
2effa54459 is described below

commit 2effa544597db8fb33a525e9be31de184b33b3d7
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Dec 8 12:44:14 2022 +0530

    Remove stray reference to fix OOM while merging sketches (#13475) (#13529)
    
    * Remove stray reference to fix OOM while merging sketches
    
    * Update future to add result from executor service
    
    * Update tests and address review comments
    
    * Address review comments
    
    * Moved mock
    
    * Close threadpool on teardown
    
    * Remove worker task cancel
---
 .../apache/druid/msq/exec/WorkerSketchFetcher.java | 36 ++++----
 .../druid/msq/exec/WorkerSketchFetcherTest.java    | 99 +---------------------
 2 files changed, 19 insertions(+), 116 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index c4118a9d38..dc6f219905 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -59,7 +59,11 @@ public class WorkerSketchFetcher implements AutoCloseable
   private final WorkerClient workerClient;
   private final ExecutorService executorService;
 
-  public WorkerSketchFetcher(WorkerClient workerClient, 
ClusterStatisticsMergeMode clusterStatisticsMergeMode, int 
statisticsMaxRetainedBytes)
+  public WorkerSketchFetcher(
+      WorkerClient workerClient,
+      ClusterStatisticsMergeMode clusterStatisticsMergeMode,
+      int statisticsMaxRetainedBytes
+  )
   {
     this.workerClient = workerClient;
     this.clusterStatisticsMergeMode = clusterStatisticsMergeMode;
@@ -86,14 +90,14 @@ public class WorkerSketchFetcher implements AutoCloseable
         return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
       case AUTO:
         if (clusterBy.getBucketByCount() == 0) {
-          log.debug("Query [%s] AUTO mode: chose PARALLEL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
+          log.info("Query [%s] AUTO mode: chose PARALLEL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
           // If there is no time clustering, there is no scope for sequential 
merge
           return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
         } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD || 
completeKeyStatisticsInformation.getBytesRetained() > BYTES_THRESHOLD) {
-          log.debug("Query [%s] AUTO mode: chose SEQUENTIAL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
+          log.info("Query [%s] AUTO mode: chose SEQUENTIAL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
           return sequentialTimeChunkMerging(completeKeyStatisticsInformation, 
stageDefinition, workerTaskIds);
         }
-        log.debug("Query [%s] AUTO mode: chose PARALLEL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
+        log.info("Query [%s] AUTO mode: chose PARALLEL mode to merge key 
statistics", stageDefinition.getId().getQueryId());
         return inMemoryFullSketchMerging(stageDefinition, workerTaskIds);
       default:
         throw new IllegalStateException("No fetching strategy found for mode: 
" + clusterStatisticsMergeMode);
@@ -128,12 +132,6 @@ public class WorkerSketchFetcher implements AutoCloseable
                 stageDefinition.getId().getQueryId(),
                 stageDefinition.getStageNumber()
             );
-        partitionFuture.whenComplete((result, exception) -> {
-          if (exception != null || (result != null && result.isError())) {
-            snapshotFuture.cancel(true);
-          }
-        });
-
         try {
           ClusterByStatisticsSnapshot clusterByStatisticsSnapshot = 
snapshotFuture.get();
           if (clusterByStatisticsSnapshot == null) {
@@ -151,12 +149,15 @@ public class WorkerSketchFetcher implements AutoCloseable
         }
         catch (Exception e) {
           synchronized (mergedStatisticsCollector) {
-            partitionFuture.completeExceptionally(e);
-            mergedStatisticsCollector.clear();
+            if (!partitionFuture.isDone()) {
+              partitionFuture.completeExceptionally(e);
+              mergedStatisticsCollector.clear();
+            }
           }
         }
       });
     });
+
     return partitionFuture;
   }
 
@@ -247,11 +248,6 @@ public class WorkerSketchFetcher implements AutoCloseable
                     stageDefinition.getStageNumber(),
                     timeChunk
                 );
-            partitionFuture.whenComplete((result, exception) -> {
-              if (exception != null || (result != null && result.isError())) {
-                snapshotFuture.cancel(true);
-              }
-            });
 
             try {
               ClusterByStatisticsSnapshot snapshotForTimeChunk = 
snapshotFuture.get();
@@ -289,8 +285,10 @@ public class WorkerSketchFetcher implements AutoCloseable
             }
             catch (Exception e) {
               synchronized (mergedStatisticsCollector) {
-                partitionFuture.completeExceptionally(e);
-                mergedStatisticsCollector.clear();
+                if (!partitionFuture.isDone()) {
+                  partitionFuture.completeExceptionally(e);
+                  mergedStatisticsCollector.clear();
+                }
               }
             }
           });
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
index 54c9a792e5..83fb73043b 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.ClusterByPartition;
 import org.apache.druid.frame.key.ClusterByPartitions;
@@ -46,7 +45,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -56,7 +54,6 @@ import static org.easymock.EasyMock.mock;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -107,52 +104,8 @@ public class WorkerSketchFetcherTest
   public void tearDown() throws Exception
   {
     mocks.close();
-  }
-
-  @Test
-  public void 
test_submitFetcherTask_parallelFetch_workerThrowsException_shouldCancelOtherTasks()
 throws Exception
-  {
-    // Store futures in a queue
-    final Queue<ListenableFuture<ClusterByStatisticsSnapshot>> futureQueue = 
new ConcurrentLinkedQueue<>();
-    final List<String> workerIds = ImmutableList.of("0", "1", "2", "3");
-    final CountDownLatch latch = new CountDownLatch(workerIds.size());
-
-    target = spy(new WorkerSketchFetcher(workerClient, 
ClusterStatisticsMergeMode.PARALLEL, 300_000_000));
-
-    // When fetching snapshots, return a mock and add future to queue
-    doAnswer(invocation -> {
-      ListenableFuture<ClusterByStatisticsSnapshot> snapshotListenableFuture =
-          
spy(Futures.immediateFuture(mock(ClusterByStatisticsSnapshot.class)));
-      futureQueue.add(snapshotListenableFuture);
-      latch.countDown();
-      latch.await();
-      return snapshotListenableFuture;
-    }).when(workerClient).fetchClusterByStatisticsSnapshot(any(), any(), 
anyInt());
-
-    // Cause a worker to fail instead of returning the result
-    doAnswer(invocation -> {
-      latch.countDown();
-      latch.await();
-      return Futures.immediateFailedFuture(new 
InterruptedException("interrupted"));
-    }).when(workerClient).fetchClusterByStatisticsSnapshot(eq("2"), any(), 
anyInt());
-
-    CompletableFuture<Either<Long, ClusterByPartitions>> 
eitherCompletableFuture = target.submitFetcherTask(
-        completeKeyStatisticsInformation,
-        workerIds,
-        stageDefinition
-    );
-
-    // Assert that the final result is failed and all other task futures are 
also cancelled.
-    Assert.assertThrows(CompletionException.class, 
eitherCompletableFuture::join);
-    Thread.sleep(1000);
-
-    Assert.assertTrue(eitherCompletableFuture.isCompletedExceptionally());
-    // Verify that the statistics collector was cleared due to the error.
-    verify(mergedClusterByStatisticsCollector1, times(1)).clear();
-    // Verify that other task futures were requested to be cancelled.
-    Assert.assertFalse(futureQueue.isEmpty());
-    for (ListenableFuture<ClusterByStatisticsSnapshot> snapshotFuture : 
futureQueue) {
-      verify(snapshotFuture, times(1)).cancel(eq(true));
+    if (target != null) {
+      target.close();
     }
   }
 
@@ -194,54 +147,6 @@ public class WorkerSketchFetcherTest
     Assert.assertEquals(expectedPartitions1, 
eitherCompletableFuture.get().valueOrThrow());
   }
 
-  @Test
-  public void 
test_submitFetcherTask_sequentialFetch_workerThrowsException_shouldCancelOtherTasks()
 throws Exception
-  {
-    // Store futures in a queue
-    final Queue<ListenableFuture<ClusterByStatisticsSnapshot>> futureQueue = 
new ConcurrentLinkedQueue<>();
-
-    SortedMap<Long, Set<Integer>> timeSegmentVsWorkerMap = 
ImmutableSortedMap.of(1L, ImmutableSet.of(0, 1, 2), 2L, ImmutableSet.of(0, 1, 
4));
-    
doReturn(timeSegmentVsWorkerMap).when(completeKeyStatisticsInformation).getTimeSegmentVsWorkerMap();
-
-    final CyclicBarrier barrier = new CyclicBarrier(3);
-    target = spy(new WorkerSketchFetcher(workerClient, 
ClusterStatisticsMergeMode.SEQUENTIAL, 300_000_000));
-
-    // When fetching snapshots, return a mock and add future to queue
-    doAnswer(invocation -> {
-      ListenableFuture<ClusterByStatisticsSnapshot> snapshotListenableFuture =
-          
spy(Futures.immediateFuture(mock(ClusterByStatisticsSnapshot.class)));
-      futureQueue.add(snapshotListenableFuture);
-      barrier.await();
-      return snapshotListenableFuture;
-    
}).when(workerClient).fetchClusterByStatisticsSnapshotForTimeChunk(anyString(), 
anyString(), anyInt(), anyLong());
-
-    // Cause a worker in the second time chunk to fail instead of returning 
the result
-    doAnswer(invocation -> {
-      barrier.await();
-      return Futures.immediateFailedFuture(new 
InterruptedException("interrupted"));
-    
}).when(workerClient).fetchClusterByStatisticsSnapshotForTimeChunk(eq("4"), 
any(), anyInt(), eq(2L));
-
-    CompletableFuture<Either<Long, ClusterByPartitions>> 
eitherCompletableFuture = target.submitFetcherTask(
-        completeKeyStatisticsInformation,
-        ImmutableList.of("0", "1", "2", "3", "4"),
-        stageDefinition
-    );
-
-    // Assert that the final result is failed and all other task futures are 
also cancelled.
-    Assert.assertThrows(CompletionException.class, 
eitherCompletableFuture::join);
-    Thread.sleep(1000);
-
-    Assert.assertTrue(eitherCompletableFuture.isCompletedExceptionally());
-    // Verify that the correct statistics collector was cleared due to the 
error.
-    verify(mergedClusterByStatisticsCollector1, times(0)).clear();
-    verify(mergedClusterByStatisticsCollector2, times(1)).clear();
-    // Verify that other task futures were requested to be cancelled.
-    Assert.assertFalse(futureQueue.isEmpty());
-    for (ListenableFuture<ClusterByStatisticsSnapshot> snapshotFuture : 
futureQueue) {
-      verify(snapshotFuture, times(1)).cancel(eq(true));
-    }
-  }
-
   @Test
   public void test_submitFetcherTask_sequentialFetch_mergePerformedCorrectly()
       throws ExecutionException, InterruptedException


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to