This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 37853f8de4 ConcurrentGrouper: Add mergeThreadLocal option, fix bug 
around the switch to spilling. (#12513)
37853f8de4 is described below

commit 37853f8de4ba801a5f6c429ccceeb58b58762109
Author: Gian Merlino <[email protected]>
AuthorDate: Sat May 21 10:28:54 2022 -0700

    ConcurrentGrouper: Add mergeThreadLocal option, fix bug around the switch 
to spilling. (#12513)
    
    * ConcurrentGrouper: Add option to always slice up merge buffers 
thread-locally.
    
    Normally, the ConcurrentGrouper shares merge buffers across processing
    threads until spilling starts, and then switches to a thread-local model.
    This minimizes memory use and reduces likelihood of spilling, which is
    good, but it creates thread contention. The new mergeThreadLocal option
    causes a query to start in thread-local mode immediately, and allows us
    to experiment with the relative performance of the two modes.
    
    * Fix grammar in docs.
    
    * Fix race in ConcurrentGrouper.
    
    * Fix issue with timeouts.
    
    * Remove unused import.
    
    * Add "tradeoff" to dictionary.
---
 docs/querying/groupbyquery.md                      |  1 +
 .../druid/query/groupby/GroupByQueryConfig.java    | 10 ++++
 .../groupby/epinephelinae/ConcurrentGrouper.java   | 68 ++++++++++++++++------
 .../groupby/epinephelinae/SpillingGrouper.java     |  5 ++
 .../epinephelinae/ConcurrentGrouperTest.java       | 26 ++++++---
 website/.spelling                                  |  1 +
 6 files changed, 85 insertions(+), 26 deletions(-)

diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md
index 9554f1e86e..41a52f5347 100644
--- a/docs/querying/groupbyquery.md
+++ b/docs/querying/groupbyquery.md
@@ -441,6 +441,7 @@ Supported query contexts:
 |`forceHashAggregation`|Overrides the value of 
`druid.query.groupBy.forceHashAggregation`|None|
 |`intermediateCombineDegree`|Overrides the value of 
`druid.query.groupBy.intermediateCombineDegree`|None|
 |`numParallelCombineThreads`|Overrides the value of 
`druid.query.groupBy.numParallelCombineThreads`|None|
+|`mergeThreadLocal`|Whether merge buffers should always be split into 
thread-local buffers. Setting this to `true` reduces thread contention, but 
uses memory less efficiently. This tradeoff is beneficial when memory is 
plentiful. |false|
 |`sortByDimsFirst`|Sort the results first by dimension values and then by 
timestamp.|false|
 |`forceLimitPushDown`|When all fields in the orderby are part of the grouping 
key, the Broker will push limit application down to the Historical processes. 
When the sorting order uses fields that are not in the grouping key, applying 
this optimization can result in approximate results with unknown accuracy, so 
this optimization is disabled by default in that case. Enabling this context 
flag turns on limit push down for limit/orderbys that contain non-grouping key 
columns.|false|
 |`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes 
(historicals, peons) then limit results during segment scan. This context value 
can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index bba6433391..3518c0e4fb 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -48,6 +48,7 @@ public class GroupByQueryConfig
   private static final String CTX_KEY_FORCE_HASH_AGGREGATION = 
"forceHashAggregation";
   private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = 
"intermediateCombineDegree";
   private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = 
"numParallelCombineThreads";
+  private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal";
 
   @JsonProperty
   private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@@ -102,6 +103,9 @@ public class GroupByQueryConfig
   @JsonProperty
   private int numParallelCombineThreads = 1;
 
+  @JsonProperty
+  private boolean mergeThreadLocal = false;
+
   @JsonProperty
   private boolean vectorize = true;
 
@@ -201,6 +205,11 @@ public class GroupByQueryConfig
     return numParallelCombineThreads;
   }
 
+  public boolean isMergeThreadLocal()
+  {
+    return mergeThreadLocal;
+  }
+
   public boolean isVectorize()
   {
     return vectorize;
@@ -282,6 +291,7 @@ public class GroupByQueryConfig
         CTX_KEY_NUM_PARALLEL_COMBINE_THREADS,
         getNumParallelCombineThreads()
     );
+    newConfig.mergeThreadLocal = 
query.getContextBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal());
     newConfig.vectorize = query.getContextBoolean(QueryContexts.VECTORIZE_KEY, 
isVectorize());
     newConfig.enableMultiValueUnnesting = query.getContextBoolean(
         CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 2ae96e8542..827583dccb 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -93,6 +93,7 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
   private final long maxDictionarySizeForCombiner;
   @Nullable
   private final ParallelCombiner<KeyType> parallelCombiner;
+  private final boolean mergeThreadLocal;
 
   private volatile boolean initialized = false;
 
@@ -135,7 +136,8 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
         hasQueryTimeout,
         queryTimeoutAt,
         groupByQueryConfig.getIntermediateCombineDegree(),
-        groupByQueryConfig.getNumParallelCombineThreads()
+        groupByQueryConfig.getNumParallelCombineThreads(),
+        groupByQueryConfig.isMergeThreadLocal()
     );
   }
 
@@ -159,7 +161,8 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
       final boolean hasQueryTimeout,
       final long queryTimeoutAt,
       final int intermediateCombineDegree,
-      final int numParallelCombineThreads
+      final int numParallelCombineThreads,
+      final boolean mergeThreadLocal
   )
   {
     Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
@@ -207,6 +210,8 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
     } else {
       this.parallelCombiner = null;
     }
+
+    this.mergeThreadLocal = mergeThreadLocal;
   }
 
   @Override
@@ -237,6 +242,10 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
             );
             grouper.init();
             groupers.add(grouper);
+
+            if (mergeThreadLocal) {
+              grouper.setSpillingAllowed(true);
+            }
           }
 
           initialized = true;
@@ -262,29 +271,46 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
       throw new ISE("Grouper is closed");
     }
 
-    if (!spilling) {
-      final SpillingGrouper<KeyType> hashBasedGrouper = 
groupers.get(grouperNumberForKeyHash(keyHash));
+    final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
+
+    if (mergeThreadLocal) {
+      // Always thread-local grouping: expect to get more memory use, but no 
thread contention.
+      return tlGrouper.aggregate(key, keyHash);
+    } else if (spilling) {
+      // Switch to thread-local grouping after spilling starts. No thread 
contention.
+      synchronized (tlGrouper) {
+        tlGrouper.setSpillingAllowed(true);
+        return tlGrouper.aggregate(key, keyHash);
+      }
+    } else {
+      // Use keyHash to find a grouper prior to spilling.
+      // There is potential here for thread contention, but it reduces memory 
use.
+      final SpillingGrouper<KeyType> subGrouper = 
groupers.get(grouperNumberForKeyHash(keyHash));
+
+      synchronized (subGrouper) {
+        if (subGrouper.isSpillingAllowed() && subGrouper != tlGrouper) {
+          // Another thread already started treating this grouper as its 
thread-local grouper. So, switch to ours.
+          // Fall through to release the lock on subGrouper and do the 
aggregation with tlGrouper.
+        } else {
+          final AggregateResult aggregateResult = subGrouper.aggregate(key, 
keyHash);
 
-      synchronized (hashBasedGrouper) {
-        if (!spilling) {
-          final AggregateResult aggregateResult = 
hashBasedGrouper.aggregate(key, keyHash);
           if (aggregateResult.isOk()) {
             return AggregateResult.ok();
           } else {
             // Expecting all-or-nothing behavior.
             assert aggregateResult.getCount() == 0;
             spilling = true;
+
+            // Fall through to release the lock on subGrouper and do the 
aggregation with tlGrouper.
           }
         }
       }
-    }
-
-    // At this point we know spilling = true
-    final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
 
-    synchronized (tlGrouper) {
-      tlGrouper.setSpillingAllowed(true);
-      return tlGrouper.aggregate(key, keyHash);
+      synchronized (tlGrouper) {
+        assert spilling;
+        tlGrouper.setSpillingAllowed(true);
+        return tlGrouper.aggregate(key, keyHash);
+      }
     }
   }
 
@@ -318,7 +344,7 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
                                                                     
getGroupersIterator(sorted);
 
     if (sorted) {
-      final boolean fullyCombined = !spilling;
+      final boolean fullyCombined = !spilling && !mergeThreadLocal;
 
       // Parallel combine is used only when data is not fully merged.
       if (!fullyCombined && parallelCombiner != null) {
@@ -398,8 +424,16 @@ public class ConcurrentGrouper<KeyType> implements 
Grouper<KeyType>
 
     ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future = 
Futures.allAsList(futures);
     try {
-      final long timeout = queryTimeoutAt - System.currentTimeMillis();
-      return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : 
future.get();
+      if (!hasQueryTimeout) {
+        return future.get();
+      } else {
+        final long timeout = queryTimeoutAt - System.currentTimeMillis();
+        if (timeout > 0) {
+          return future.get(timeout, TimeUnit.MILLISECONDS);
+        } else {
+          throw new TimeoutException();
+        }
+      }
     }
     catch (InterruptedException | CancellationException e) {
       GuavaUtils.cancelAll(true, future, futures);
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 44b44d70df..86365fef12 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -241,6 +241,11 @@ public class SpillingGrouper<KeyType> implements 
Grouper<KeyType>
     return new ArrayList<>(mergedDictionary);
   }
 
+  public boolean isSpillingAllowed()
+  {
+    return spillingAllowed;
+  }
+
   public void setSpillingAllowed(final boolean spillingAllowed)
   {
     this.spillingAllowed = spillingAllowed;
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index 49951cdcb9..6ab2f1a7bf 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.collections.ReferenceCountingResourceHolder;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.QueryTimeoutException;
@@ -61,7 +62,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -78,10 +78,11 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
   private final Supplier<ByteBuffer> bufferSupplier;
   private final int concurrencyHint;
   private final int parallelCombineThreads;
-  private final ExecutorService exec = Executors.newFixedThreadPool(8);
+  private final ExecutorService exec;
+  private final boolean mergeThreadLocal;
   private final Closer closer = Closer.create();
 
-  @Parameters(name = "bufferSize={0}, concurrencyHint={1}, 
parallelCombineThreads={2}")
+  @Parameters(name = "bufferSize={0}, concurrencyHint={1}, 
parallelCombineThreads={2}, mergeThreadLocal={3}")
   public static Collection<Object[]> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
@@ -89,8 +90,10 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
     for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) {
       for (final int concurrencyHint : new int[]{1, 8}) {
         for (final int parallelCombineThreads : new int[]{0, 8}) {
-          if (parallelCombineThreads <= concurrencyHint) {
-            constructors.add(new Object[]{bufferSize, concurrencyHint, 
parallelCombineThreads});
+          for (final boolean mergeThreadLocal : new boolean[]{true, false}) {
+            if (parallelCombineThreads <= concurrencyHint) {
+              constructors.add(new Object[]{bufferSize, concurrencyHint, 
parallelCombineThreads, mergeThreadLocal});
+            }
           }
         }
       }
@@ -115,11 +118,13 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
   public ConcurrentGrouperTest(
       int bufferSize,
       int concurrencyHint,
-      int parallelCombineThreads
+      int parallelCombineThreads,
+      boolean mergeThreadLocal
   )
   {
     this.concurrencyHint = concurrencyHint;
     this.parallelCombineThreads = parallelCombineThreads;
+    this.mergeThreadLocal = mergeThreadLocal;
     this.bufferSupplier = new Supplier<ByteBuffer>()
     {
       private final AtomicBoolean called = new AtomicBoolean(false);
@@ -135,6 +140,7 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
         return buffer;
       }
     };
+    this.exec = Execs.multiThreaded(concurrencyHint, 
"ConcurrentGrouperTest-%d");
   }
 
   @Test()
@@ -165,7 +171,8 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
         false,
         0,
         4,
-        parallelCombineThreads
+        parallelCombineThreads,
+        mergeThreadLocal
     );
     closer.register(grouper);
     grouper.init();
@@ -195,7 +202,7 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
 
     final CloseableIterator<Entry<LongKey>> iterator = 
closer.register(grouper.iterator(true));
 
-    if (parallelCombineThreads > 1 && temporaryStorage.currentSize() > 0) {
+    if (parallelCombineThreads > 1 && (mergeThreadLocal || 
temporaryStorage.currentSize() > 0)) {
       // Parallel combiner configured, and expected to actually be used due to 
thread-local merge (either explicitly
       // configured, or due to spilling).
       Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
@@ -234,7 +241,8 @@ public class ConcurrentGrouperTest extends 
InitializedNullHandlingTest
         true,
         1,
         4,
-        parallelCombineThreads
+        parallelCombineThreads,
+        mergeThreadLocal
     );
     closer.register(grouper);
     grouper.init();
diff --git a/website/.spelling b/website/.spelling
index f0b5b0370c..fd16c89754 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1580,6 +1580,7 @@ pre-existing
 pushdown
 row1
 subtotalsSpec
+tradeoff
 unnested
 unnesting
  - ../docs/querying/having.md


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to