This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 271f90f Add segment pruning for hash based shard spec (#9810)
271f90f is described below
commit 271f90f2052ad4288e4981e12fea8c5d0786a601
Author: Jian Wang <[email protected]>
AuthorDate: Thu Jul 30 18:44:26 2020 -0700
Add segment pruning for hash based shard spec (#9810)
* Add segment pruning for hash based partitioning
* Update doc
* Add additional test
* Address comments
* Fix unit test failure
Co-authored-by: Jian Wang <[email protected]>
---
.../partition/HashBasedNumberedShardSpec.java | 114 +++++++++++++++++-
.../partition/HashBasedNumberedShardSpecTest.java | 57 +++++++++
docs/ingestion/native-batch.md | 2 +-
.../druid/client/CachingClusteredClientTest.java | 132 +++++++++++++++++++++
4 files changed, 302 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
index 721eb67..a1ec5a9 100644
---
a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++
b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@@ -26,16 +26,24 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
@@ -91,9 +99,38 @@ public class HashBasedNumberedShardSpec extends
NumberedShardSpec
}
@Override
+ public List<String> getDomainDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
- return (((long) hash(timestamp, inputRow)) - bucketId) % numBuckets == 0;
+ return getBucketIndex(hash(timestamp, inputRow), numBuckets) == bucketId %
numBuckets;
+ }
+
+ /**
+ * Check if the current segment possibly holds records if the values of
dimensions in {@link #partitionDimensions}
+ * are of {@code partitionDimensionsValues}
+ *
+ * @param partitionDimensionsValues An instance of values of dimensions in
{@link #partitionDimensions}
+ *
+ * @return Whether the current segment possibly holds records for the given
values of partition dimensions
+ */
+ private boolean isInChunk(Map<String, String> partitionDimensionsValues)
+ {
+ assert !partitionDimensions.isEmpty();
+ List<Object> groupKey = Lists.transform(
+ partitionDimensions,
+ o -> Collections.singletonList(partitionDimensionsValues.get(o))
+ );
+ try {
+ return getBucketIndex(hash(jsonMapper, groupKey), numBuckets) ==
bucketId % numBuckets;
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -152,7 +189,7 @@ public class HashBasedNumberedShardSpec extends
NumberedShardSpec
)
{
return (long timestamp, InputRow row) -> {
- int index = Math.abs(hash(jsonMapper, partitionDimensions, timestamp,
row) % numBuckets);
+ int index = getBucketIndex(hash(jsonMapper, partitionDimensions,
timestamp, row), numBuckets);
return shardSpecs.get(index);
};
}
@@ -192,4 +229,77 @@ public class HashBasedNumberedShardSpec extends
NumberedShardSpec
", partitionDimensions=" + partitionDimensions +
'}';
}
+
+ @Override
+ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
+ {
+ // If no partitionDimensions are specified during ingestion, hash is based
on all dimensions plus the truncated
+ // input timestamp according to QueryGranularity instead of just
partitionDimensions. Since we don't store in shard
+ // specs the truncated timestamps of the events that fall into the shard
after ingestion, there's no way to recover
+ // the hash during ingestion, bypass this case
+ if (partitionDimensions.isEmpty()) {
+ return true;
+ }
+
+ // One possible optimization is to move the conversion from range set to
point set to the function signature and
+ // cache it in the caller of this function if there are repetitive calls
of the same domain
+ Map<String, Set<String>> domainSet = new HashMap<>();
+ for (String p : partitionDimensions) {
+ RangeSet<String> domainRangeSet = domain.get(p);
+ if (domainRangeSet == null || domainRangeSet.isEmpty()) {
+ return true;
+ }
+
+ for (Range<String> v : domainRangeSet.asRanges()) {
+ // If there are range values, simply bypass, because we can't hash
range values
+ if (v.isEmpty() || !v.hasLowerBound() || !v.hasUpperBound() ||
+ v.lowerBoundType() != BoundType.CLOSED || v.upperBoundType() !=
BoundType.CLOSED ||
+ !v.lowerEndpoint().equals(v.upperEndpoint())) {
+ return true;
+ }
+ domainSet.computeIfAbsent(p, k -> new
HashSet<>()).add(v.lowerEndpoint());
+ }
+ }
+
+ return !domainSet.isEmpty() && chunkPossibleInDomain(domainSet, new
HashMap<>());
+ }
+
+ /**
+ * Recursively enumerate all possible combinations of values for dimensions
in {@link #partitionDimensions} based on
+ * {@code domainSet}, test if any combination matches the current segment
+ *
+ * @param domainSet The set where values of dimensions in
{@link #partitionDimensions} are
+ * drawn from
+ * @param partitionDimensionsValues A map from dimensions in {@link
#partitionDimensions} to their values drawn from
+ * {@code domainSet}
+ *
+ * @return Whether the current segment possibly holds records for the
provided domain. Return false if and only if
+ * none of the combinations matches this segment
+ */
+ private boolean chunkPossibleInDomain(
+ Map<String, Set<String>> domainSet,
+ Map<String, String> partitionDimensionsValues
+ )
+ {
+ int curIndex = partitionDimensionsValues.size();
+ if (curIndex == partitionDimensions.size()) {
+ return isInChunk(partitionDimensionsValues);
+ }
+
+ String dimension = partitionDimensions.get(curIndex);
+ for (String e : domainSet.get(dimension)) {
+ partitionDimensionsValues.put(dimension, e);
+ if (chunkPossibleInDomain(domainSet, partitionDimensionsValues)) {
+ return true;
+ }
+ partitionDimensionsValues.remove(dimension);
+ }
+
+ return false;
+ }
+
+ private static int getBucketIndex(int hash, int numBuckets)
+ {
+ return Math.abs(hash % numBuckets);
+ }
}
diff --git
a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
index 624f3d3..885b1fd 100644
---
a/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
+++
b/core/src/test/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpecTest.java
@@ -24,6 +24,9 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
@@ -37,6 +40,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -216,6 +220,59 @@ public class HashBasedNumberedShardSpecTest
Assert.assertFalse(shardSpec.sharePartitionSpace(new
NumberedOverwritePartialShardSpec(0, 2, 1)));
}
+ @Test
+ public void testPossibleInDomain()
+ {
+ final RangeSet<String> rangeSet = TreeRangeSet.create();
+ rangeSet.add(Range.closed("123", "123"));
+ final Map<String, RangeSet<String>> domain = ImmutableMap.of("visitor_id",
rangeSet);
+
+ // Without partition info
+ HashBasedNumberedShardSpec shardSpec = new HashBasedNumberedShardSpec(
+ 0,
+ 1,
+ 0,
+ 1,
+ ImmutableList.of(),
+ objectMapper
+ );
+ Assert.assertTrue(shardSpec.possibleInDomain(domain));
+
+ // With partition info and matching partition dimensions
+ final int numBuckets = 3;
+ List<HashBasedNumberedShardSpec> shardSpecs = ImmutableList.of(
+ new HashBasedNumberedShardSpec(
+ 0,
+ numBuckets,
+ 0,
+ numBuckets,
+ ImmutableList.of("visitor_id"),
+ objectMapper
+ ),
+ new HashBasedNumberedShardSpec(
+ 1,
+ numBuckets,
+ 1,
+ numBuckets,
+ ImmutableList.of("visitor_id"),
+ objectMapper
+ ),
+ new HashBasedNumberedShardSpec(
+ 2,
+ numBuckets,
+ 2,
+ numBuckets,
+ ImmutableList.of("visitor_id"),
+ objectMapper
+ )
+ );
+ Assert.assertEquals(1, shardSpecs.stream().filter(s ->
s.possibleInDomain(domain)).count());
+
+ // Partition dimensions not match
+ final Map<String, RangeSet<String>> domain1 =
ImmutableMap.of("vistor_id_1", rangeSet);
+ Assert.assertEquals(shardSpecs.size(), shardSpecs.stream().filter(s ->
s.possibleInDomain(domain1)).count());
+ }
+
public boolean assertExistsInOneSpec(List<ShardSpec> specs, InputRow row)
{
for (ShardSpec spec : specs) {
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index f1c22e2..8538145 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -255,7 +255,7 @@ The three `partitionsSpec` types have different
characteristics.
| PartitionsSpec | Ingestion speed | Partitioning method | Supported rollup
mode | Segment pruning at query time |
|----------------|-----------------|---------------------|-----------------------|-------------------------------|
| `dynamic` | Fastest | Partitioning based on number of rows in segment. |
Best-effort rollup | N/A |
-| `hashed` | Moderate | Partitioning based on the hash value of partition
dimensions. This partitioning may reduce your datasource size and query latency
by improving data locality. See [Partitioning](./index.md#partitioning) for
more details. | Perfect rollup | N/A |
+| `hashed` | Moderate | Partitioning based on the hash value of partition
dimensions. This partitioning may reduce your datasource size and query latency
by improving data locality. See [Partitioning](./index.md#partitioning) for
more details. | Perfect rollup | The broker can use the partition information
to prune segments early to speed up queries if `partitionDimensions` is
explicitly specified during ingestion. Since the broker knows how to hash
`partitionDimensions` values to locat [...]
| `single_dim` | Slowest | Range partitioning based on the value of the
partition dimension. Segment sizes may be skewed depending on the partition key
distribution. This may reduce your datasource size and query latency by
improving data locality. See [Partitioning](./index.md#partitioning) for more
details. | Perfect rollup | The broker can use the partition information to
prune segments early to speed up queries. Since the broker knows the range of
`partitionDimension` values in each [...]
The recommended use case for each partitionsSpec is:
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 452dd8c..177a5a0 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -118,6 +118,7 @@ import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
@@ -125,6 +126,7 @@ import
org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
@@ -1541,6 +1543,136 @@ public class CachingClusteredClientTest
Assert.assertEquals(expected, ((TimeseriesQuery)
capture.getValue().getQuery()).getQuerySegmentSpec());
}
+ @Test
+ public void testHashBasedPruning()
+ {
+ DimFilter filter = new AndDimFilter(
+ new SelectorDimFilter("dim1", "a", null),
+ new BoundDimFilter("dim2", "e", "zzz", true, true, false, null,
StringComparators.LEXICOGRAPHIC),
+ // Equivalent filter of dim3 below is InDimFilter("dim3",
Arrays.asList("c"), null)
+ new AndDimFilter(
+ new InDimFilter("dim3", Arrays.asList("a", "c", "e", "g"), null),
+ new BoundDimFilter("dim3", "aaa", "ddd", false, false, false,
null, StringComparators.LEXICOGRAPHIC)
+ )
+ );
+
+ final Druids.TimeseriesQueryBuilder builder =
Druids.newTimeseriesQueryBuilder()
+
.dataSource(DATA_SOURCE)
+ .filters(filter)
+
.granularity(GRANULARITY)
+ .intervals(SEG_SPEC)
+ .context(CONTEXT)
+
.intervals("2011-01-05/2011-01-10")
+
.aggregators(RENAMED_AGGS)
+
.postAggregators(RENAMED_POST_AGGS)
+ .randomQueryId();
+
+ TimeseriesQuery query = builder.build();
+
+ QueryRunner runner = new
FinalizeResultsQueryRunner(getDefaultQueryRunner(), new
TimeseriesQueryQueryToolChest());
+
+ final Interval interval1 = Intervals.of("2011-01-06/2011-01-07");
+ final Interval interval2 = Intervals.of("2011-01-07/2011-01-08");
+ final Interval interval3 = Intervals.of("2011-01-08/2011-01-09");
+
+ final DruidServer lastServer = servers[random.nextInt(servers.length)];
+ List<String> partitionDimensions1 = ImmutableList.of("dim1");
+ ServerSelector selector1 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 0, 6);
+ ServerSelector selector2 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 1, 6);
+ ServerSelector selector3 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 2, 6);
+ ServerSelector selector4 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 3, 6);
+ ServerSelector selector5 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 4, 6);
+ ServerSelector selector6 = makeMockHashBasedSelector(lastServer,
partitionDimensions1, 5, 6);
+
+ List<String> partitionDimensions2 = ImmutableList.of("dim2");
+ ServerSelector selector7 = makeMockHashBasedSelector(lastServer,
partitionDimensions2, 0, 3);
+ ServerSelector selector8 = makeMockHashBasedSelector(lastServer,
partitionDimensions2, 1, 3);
+ ServerSelector selector9 = makeMockHashBasedSelector(lastServer,
partitionDimensions2, 2, 3);
+
+ List<String> partitionDimensions3 = ImmutableList.of("dim1", "dim3");
+ ServerSelector selector10 = makeMockHashBasedSelector(lastServer,
partitionDimensions3, 0, 4);
+ ServerSelector selector11 = makeMockHashBasedSelector(lastServer,
partitionDimensions3, 1, 4);
+ ServerSelector selector12 = makeMockHashBasedSelector(lastServer,
partitionDimensions3, 2, 4);
+ ServerSelector selector13 = makeMockHashBasedSelector(lastServer,
partitionDimensions3, 3, 4);
+
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 6,
selector1));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(1, 6,
selector2));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(2, 6,
selector3));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(3, 6,
selector4));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(4, 6,
selector5));
+ timeline.add(interval1, "v", new NumberedPartitionChunk<>(5, 6,
selector6));
+
+ timeline.add(interval2, "v", new NumberedPartitionChunk<>(0, 3,
selector7));
+ timeline.add(interval2, "v", new NumberedPartitionChunk<>(1, 3,
selector8));
+ timeline.add(interval2, "v", new NumberedPartitionChunk<>(2, 3,
selector9));
+
+ timeline.add(interval3, "v", new NumberedPartitionChunk<>(0, 3,
selector10));
+ timeline.add(interval3, "v", new NumberedPartitionChunk<>(1, 3,
selector11));
+ timeline.add(interval3, "v", new NumberedPartitionChunk<>(2, 3,
selector12));
+ timeline.add(interval3, "v", new NumberedPartitionChunk<>(2, 3,
selector13));
+
+ final Capture<QueryPlus> capture = Capture.newInstance();
+ final Capture<ResponseContext> contextCap = Capture.newInstance();
+
+ QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class);
+ EasyMock.expect(mockRunner.run(EasyMock.capture(capture),
EasyMock.capture(contextCap)))
+ .andReturn(Sequences.empty())
+ .anyTimes();
+ EasyMock.expect(serverView.getQueryRunner(lastServer))
+ .andReturn(mockRunner)
+ .anyTimes();
+ EasyMock.replay(serverView);
+ EasyMock.replay(mockRunner);
+
+ List<SegmentDescriptor> expcetedDescriptors = new ArrayList<>();
+ // Narrow down to 1 chunk
+ expcetedDescriptors.add(new SegmentDescriptor(interval1, "v", 3));
+ // Can't filter out any chunks
+ expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 0));
+ expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 1));
+ expcetedDescriptors.add(new SegmentDescriptor(interval2, "v", 2));
+ // Narrow down to 1 chunk
+ expcetedDescriptors.add(new SegmentDescriptor(interval3, "v", 2));
+
+ MultipleSpecificSegmentSpec expected = new
MultipleSpecificSegmentSpec(expcetedDescriptors);
+
+ runner.run(QueryPlus.wrap(query)).toList();
+ Assert.assertEquals(expected, ((TimeseriesQuery)
capture.getValue().getQuery()).getQuerySegmentSpec());
+ }
+
+ private ServerSelector makeMockHashBasedSelector(
+ DruidServer server,
+ List<String> partitionDimensions,
+ int partitionNum,
+ int partitions
+ )
+ {
+ final DataSegment segment = new DataSegment(
+ SegmentId.dummy(DATA_SOURCE),
+ null,
+ null,
+ null,
+ new HashBasedNumberedShardSpec(
+ partitionNum,
+ partitions,
+ partitionNum,
+ partitions,
+ partitionDimensions,
+ ServerTestHelper.MAPPER
+ ),
+ null,
+ 9,
+ 0L
+ );
+
+ ServerSelector selector = new ServerSelector(
+ segment,
+ new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy())
+ );
+ selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null),
segment);
+ return selector;
+ }
+
private ServerSelector makeMockSingleDimensionSelector(
DruidServer server,
String dimension,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]