This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 37853f8de4 ConcurrentGrouper: Add mergeThreadLocal option, fix bug
around the switch to spilling. (#12513)
37853f8de4 is described below
commit 37853f8de4ba801a5f6c429ccceeb58b58762109
Author: Gian Merlino <[email protected]>
AuthorDate: Sat May 21 10:28:54 2022 -0700
ConcurrentGrouper: Add mergeThreadLocal option, fix bug around the switch
to spilling. (#12513)
* ConcurrentGrouper: Add option to always slice up merge buffers
thread-locally.
Normally, the ConcurrentGrouper shares merge buffers across processing
threads until spilling starts, and then switches to a thread-local model.
This minimizes memory use and reduces likelihood of spilling, which is
good, but it creates thread contention. The new mergeThreadLocal option
causes a query to start in thread-local mode immediately, and allows us
to experiment with the relative performance of the two modes.
* Fix grammar in docs.
* Fix race in ConcurrentGrouper.
* Fix issue with timeouts.
* Remove unused import.
* Add "tradeoff" to dictionary.
---
docs/querying/groupbyquery.md | 1 +
.../druid/query/groupby/GroupByQueryConfig.java | 10 ++++
.../groupby/epinephelinae/ConcurrentGrouper.java | 68 ++++++++++++++++------
.../groupby/epinephelinae/SpillingGrouper.java | 5 ++
.../epinephelinae/ConcurrentGrouperTest.java | 26 ++++++---
website/.spelling | 1 +
6 files changed, 85 insertions(+), 26 deletions(-)
diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md
index 9554f1e86e..41a52f5347 100644
--- a/docs/querying/groupbyquery.md
+++ b/docs/querying/groupbyquery.md
@@ -441,6 +441,7 @@ Supported query contexts:
|`forceHashAggregation`|Overrides the value of
`druid.query.groupBy.forceHashAggregation`|None|
|`intermediateCombineDegree`|Overrides the value of
`druid.query.groupBy.intermediateCombineDegree`|None|
|`numParallelCombineThreads`|Overrides the value of
`druid.query.groupBy.numParallelCombineThreads`|None|
+|`mergeThreadLocal`|Whether merge buffers should always be split into
thread-local buffers. Setting this to `true` reduces thread contention, but
uses memory less efficiently. This tradeoff is beneficial when memory is
plentiful. |false|
|`sortByDimsFirst`|Sort the results first by dimension values and then by
timestamp.|false|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping
key, the Broker will push limit application down to the Historical processes.
When the sorting order uses fields that are not in the grouping key, applying
this optimization can result in approximate results with unknown accuracy, so
this optimization is disabled by default in that case. Enabling this context
flag turns on limit push down for limit/orderbys that contain non-grouping key
columns.|false|
|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes
(historicals, peons) then limit results during segment scan. This context value
can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index bba6433391..3518c0e4fb 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -48,6 +48,7 @@ public class GroupByQueryConfig
private static final String CTX_KEY_FORCE_HASH_AGGREGATION =
"forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE =
"intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS =
"numParallelCombineThreads";
+ private static final String CTX_KEY_MERGE_THREAD_LOCAL = "mergeThreadLocal";
@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
@@ -102,6 +103,9 @@ public class GroupByQueryConfig
@JsonProperty
private int numParallelCombineThreads = 1;
+ @JsonProperty
+ private boolean mergeThreadLocal = false;
+
@JsonProperty
private boolean vectorize = true;
@@ -201,6 +205,11 @@ public class GroupByQueryConfig
return numParallelCombineThreads;
}
+ public boolean isMergeThreadLocal()
+ {
+ return mergeThreadLocal;
+ }
+
public boolean isVectorize()
{
return vectorize;
@@ -282,6 +291,7 @@ public class GroupByQueryConfig
CTX_KEY_NUM_PARALLEL_COMBINE_THREADS,
getNumParallelCombineThreads()
);
+ newConfig.mergeThreadLocal =
query.getContextBoolean(CTX_KEY_MERGE_THREAD_LOCAL, isMergeThreadLocal());
newConfig.vectorize = query.getContextBoolean(QueryContexts.VECTORIZE_KEY,
isVectorize());
newConfig.enableMultiValueUnnesting = query.getContextBoolean(
CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 2ae96e8542..827583dccb 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -93,6 +93,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
private final long maxDictionarySizeForCombiner;
@Nullable
private final ParallelCombiner<KeyType> parallelCombiner;
+ private final boolean mergeThreadLocal;
private volatile boolean initialized = false;
@@ -135,7 +136,8 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
hasQueryTimeout,
queryTimeoutAt,
groupByQueryConfig.getIntermediateCombineDegree(),
- groupByQueryConfig.getNumParallelCombineThreads()
+ groupByQueryConfig.getNumParallelCombineThreads(),
+ groupByQueryConfig.isMergeThreadLocal()
);
}
@@ -159,7 +161,8 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
final boolean hasQueryTimeout,
final long queryTimeoutAt,
final int intermediateCombineDegree,
- final int numParallelCombineThreads
+ final int numParallelCombineThreads,
+ final boolean mergeThreadLocal
)
{
Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
@@ -207,6 +210,8 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
} else {
this.parallelCombiner = null;
}
+
+ this.mergeThreadLocal = mergeThreadLocal;
}
@Override
@@ -237,6 +242,10 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
);
grouper.init();
groupers.add(grouper);
+
+ if (mergeThreadLocal) {
+ grouper.setSpillingAllowed(true);
+ }
}
initialized = true;
@@ -262,29 +271,46 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
throw new ISE("Grouper is closed");
}
- if (!spilling) {
- final SpillingGrouper<KeyType> hashBasedGrouper =
groupers.get(grouperNumberForKeyHash(keyHash));
+ final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
+
+ if (mergeThreadLocal) {
+ // Always thread-local grouping: expect to get more memory use, but no
thread contention.
+ return tlGrouper.aggregate(key, keyHash);
+ } else if (spilling) {
+ // Switch to thread-local grouping after spilling starts. No thread
contention.
+ synchronized (tlGrouper) {
+ tlGrouper.setSpillingAllowed(true);
+ return tlGrouper.aggregate(key, keyHash);
+ }
+ } else {
+ // Use keyHash to find a grouper prior to spilling.
+ // There is potential here for thread contention, but it reduces memory
use.
+ final SpillingGrouper<KeyType> subGrouper =
groupers.get(grouperNumberForKeyHash(keyHash));
+
+ synchronized (subGrouper) {
+ if (subGrouper.isSpillingAllowed() && subGrouper != tlGrouper) {
+ // Another thread already started treating this grouper as its
thread-local grouper. So, switch to ours.
+ // Fall through to release the lock on subGrouper and do the
aggregation with tlGrouper.
+ } else {
+ final AggregateResult aggregateResult = subGrouper.aggregate(key,
keyHash);
- synchronized (hashBasedGrouper) {
- if (!spilling) {
- final AggregateResult aggregateResult =
hashBasedGrouper.aggregate(key, keyHash);
if (aggregateResult.isOk()) {
return AggregateResult.ok();
} else {
// Expecting all-or-nothing behavior.
assert aggregateResult.getCount() == 0;
spilling = true;
+
+ // Fall through to release the lock on subGrouper and do the
aggregation with tlGrouper.
}
}
}
- }
-
- // At this point we know spilling = true
- final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
- synchronized (tlGrouper) {
- tlGrouper.setSpillingAllowed(true);
- return tlGrouper.aggregate(key, keyHash);
+ synchronized (tlGrouper) {
+ assert spilling;
+ tlGrouper.setSpillingAllowed(true);
+ return tlGrouper.aggregate(key, keyHash);
+ }
}
}
@@ -318,7 +344,7 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
getGroupersIterator(sorted);
if (sorted) {
- final boolean fullyCombined = !spilling;
+ final boolean fullyCombined = !spilling && !mergeThreadLocal;
// Parallel combine is used only when data is not fully merged.
if (!fullyCombined && parallelCombiner != null) {
@@ -398,8 +424,16 @@ public class ConcurrentGrouper<KeyType> implements
Grouper<KeyType>
ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future =
Futures.allAsList(futures);
try {
- final long timeout = queryTimeoutAt - System.currentTimeMillis();
- return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) :
future.get();
+ if (!hasQueryTimeout) {
+ return future.get();
+ } else {
+ final long timeout = queryTimeoutAt - System.currentTimeMillis();
+ if (timeout > 0) {
+ return future.get(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ throw new TimeoutException();
+ }
+ }
}
catch (InterruptedException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 44b44d70df..86365fef12 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -241,6 +241,11 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
return new ArrayList<>(mergedDictionary);
}
+ public boolean isSpillingAllowed()
+ {
+ return spillingAllowed;
+ }
+
public void setSpillingAllowed(final boolean spillingAllowed)
{
this.spillingAllowed = spillingAllowed;
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index 49951cdcb9..6ab2f1a7bf 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryTimeoutException;
@@ -61,7 +62,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,10 +78,11 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
private final Supplier<ByteBuffer> bufferSupplier;
private final int concurrencyHint;
private final int parallelCombineThreads;
- private final ExecutorService exec = Executors.newFixedThreadPool(8);
+ private final ExecutorService exec;
+ private final boolean mergeThreadLocal;
private final Closer closer = Closer.create();
- @Parameters(name = "bufferSize={0}, concurrencyHint={1},
parallelCombineThreads={2}")
+ @Parameters(name = "bufferSize={0}, concurrencyHint={1},
parallelCombineThreads={2}, mergeThreadLocal={3}")
public static Collection<Object[]> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
@@ -89,8 +90,10 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) {
for (final int concurrencyHint : new int[]{1, 8}) {
for (final int parallelCombineThreads : new int[]{0, 8}) {
- if (parallelCombineThreads <= concurrencyHint) {
- constructors.add(new Object[]{bufferSize, concurrencyHint,
parallelCombineThreads});
+ for (final boolean mergeThreadLocal : new boolean[]{true, false}) {
+ if (parallelCombineThreads <= concurrencyHint) {
+ constructors.add(new Object[]{bufferSize, concurrencyHint,
parallelCombineThreads, mergeThreadLocal});
+ }
}
}
}
@@ -115,11 +118,13 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
public ConcurrentGrouperTest(
int bufferSize,
int concurrencyHint,
- int parallelCombineThreads
+ int parallelCombineThreads,
+ boolean mergeThreadLocal
)
{
this.concurrencyHint = concurrencyHint;
this.parallelCombineThreads = parallelCombineThreads;
+ this.mergeThreadLocal = mergeThreadLocal;
this.bufferSupplier = new Supplier<ByteBuffer>()
{
private final AtomicBoolean called = new AtomicBoolean(false);
@@ -135,6 +140,7 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
return buffer;
}
};
+ this.exec = Execs.multiThreaded(concurrencyHint,
"ConcurrentGrouperTest-%d");
}
@Test()
@@ -165,7 +171,8 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
false,
0,
4,
- parallelCombineThreads
+ parallelCombineThreads,
+ mergeThreadLocal
);
closer.register(grouper);
grouper.init();
@@ -195,7 +202,7 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
final CloseableIterator<Entry<LongKey>> iterator =
closer.register(grouper.iterator(true));
- if (parallelCombineThreads > 1 && temporaryStorage.currentSize() > 0) {
+ if (parallelCombineThreads > 1 && (mergeThreadLocal ||
temporaryStorage.currentSize() > 0)) {
// Parallel combiner configured, and expected to actually be used due to
thread-local merge (either explicitly
// configured, or due to spilling).
Assert.assertTrue(TEST_RESOURCE_HOLDER.taken);
@@ -234,7 +241,8 @@ public class ConcurrentGrouperTest extends
InitializedNullHandlingTest
true,
1,
4,
- parallelCombineThreads
+ parallelCombineThreads,
+ mergeThreadLocal
);
closer.register(grouper);
grouper.init();
diff --git a/website/.spelling b/website/.spelling
index f0b5b0370c..fd16c89754 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1580,6 +1580,7 @@ pre-existing
pushdown
row1
subtotalsSpec
+tradeoff
unnested
unnesting
- ../docs/querying/having.md
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]