This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0080e33 Fix cardinality estimation (#10762)
0080e33 is described below
commit 0080e333cc193ef538b57322f2cf256866ed7f66
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Fri Jan 29 04:36:10 2021 +0530
Fix cardinality estimation (#10762)
* Fix cardinality estimation
* Add unit test
* code coverage
* fix typo
---
.../parallel/PartialDimensionCardinalityTask.java | 17 ++++-----
.../parallel/ParallelIndexTestingFactory.java | 13 +++++++
.../PartialDimensionCardinalityTaskTest.java | 40 ++++++++++++++++++++++
.../apache/druid/segment/indexing/DataSchema.java | 14 ++++++++
.../druid/segment/indexing/DataSchemaTest.java | 31 +++++++++++++++++
5 files changed, 104 insertions(+), 11 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 48be3e7..d39822e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -44,12 +44,12 @@ import
org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitioner;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -146,11 +146,6 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec)
tuningConfig.getPartitionsSpec();
Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in
tuningConfig");
- List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
- if (partitionDimensions == null) {
- partitionDimensions =
HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
- }
-
InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
@@ -179,8 +174,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
) {
Map<Interval, byte[]> cardinalities = determineCardinalities(
inputRowIterator,
- granularitySpec,
- partitionDimensions
+ granularitySpec
);
sendReport(
@@ -194,8 +188,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
private Map<Interval, byte[]> determineCardinalities(
CloseableIterator<InputRow> inputRowIterator,
- GranularitySpec granularitySpec,
- List<String> partitionDimensions
+ GranularitySpec granularitySpec
)
{
Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
@@ -218,8 +211,10 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
interval,
(intervalKey) ->
DimensionCardinalityReport.createHllSketchForReport()
);
+ // For cardinality estimation, we want to consider unique rows instead
of unique hash buckets and therefore
+ // we do not use partition dimensions in computing the group key
List<Object> groupKey = HashPartitioner.extractKeys(
- partitionDimensions,
+ Collections.emptyList(),
queryGranularity.bucketStart(timestamp).getMillis(),
inputRow
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index e663022..0e6460d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -53,6 +53,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -277,6 +278,18 @@ class ParallelIndexTestingFactory
}
}
+ static String createRowFromMap(long timestamp, Map<String, Object> fields)
+ {
+ HashMap<String, Object> row = new HashMap<>(fields);
+ row.put(SCHEMA_TIME, timestamp);
+ try {
+ return NESTED_OBJECT_MAPPER.writeValueAsString(row);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
static InputFormat getInputFormat()
{
return new JsonInputFormat(null, null, null);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index 0d65f86..835172d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -21,12 +21,14 @@ package
org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -265,6 +267,38 @@ public class PartialDimensionCardinalityTaskTest
}
@Test
+ public void sendsCorrectReportWhenNonEmptyPartitionDimension()
+ {
+ InputSource inlineInputSource = new InlineInputSource(
+ ParallelIndexTestingFactory.createRowFromMap(0,
ImmutableMap.of("dim1", "a", "dim2", "1")) + "\n" +
+ ParallelIndexTestingFactory.createRowFromMap(0,
ImmutableMap.of("dim1", "a", "dim2", "2")) + "\n" +
+ ParallelIndexTestingFactory.createRowFromMap(0,
ImmutableMap.of("dim1", "b", "dim2", "3")) + "\n" +
+ ParallelIndexTestingFactory.createRowFromMap(0,
ImmutableMap.of("dim1", "b", "dim2", "4"))
+ );
+ HashedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(null,
null,
+
Collections.singletonList("dim1")
+ );
+ ParallelIndexTuningConfig tuningConfig = new
ParallelIndexTestingFactory.TuningConfigBuilder()
+ .partitionsSpec(partitionsSpec)
+ .build();
+
+ PartialDimensionCardinalityTaskBuilder taskBuilder = new
PartialDimensionCardinalityTaskBuilder()
+ .inputSource(inlineInputSource)
+ .tuningConfig(tuningConfig)
+ .withDimensions(Arrays.asList("dim1", "dim2"));
+
+ DimensionCardinalityReport report = runTask(taskBuilder);
+
+ Assert.assertEquals(ParallelIndexTestingFactory.ID, report.getTaskId());
+ Map<Interval, byte[]> intervalToCardinalities =
report.getIntervalToCardinalities();
+ byte[] hllSketchBytes =
Iterables.getOnlyElement(intervalToCardinalities.values());
+ HllSketch hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
+ Assert.assertNotNull(hllSketch);
+ Assert.assertEquals(4L, (long) hllSketch.getEstimate());
+
+ }
+
+ @Test
public void sendsCorrectReportWithMultipleIntervalsInData()
{
// Segment granularity is DAY, query granularity is HOUR
@@ -368,6 +402,12 @@ public class PartialDimensionCardinalityTaskTest
return this;
}
+ PartialDimensionCardinalityTaskBuilder withDimensions(List<String> dims)
+ {
+ this.dataSchema = dataSchema.withDimensionsSpec(new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims)));
+ return this;
+ }
+
PartialDimensionCardinalityTask build()
{
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 3fc0fa2..f36f754 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -300,6 +300,20 @@ public class DataSchema
);
}
+ public DataSchema withDimensionsSpec(DimensionsSpec dimensionsSpec)
+ {
+ return new DataSchema(
+ dataSource,
+ timestampSpec,
+ dimensionsSpec,
+ aggregators,
+ granularitySpec,
+ transformSpec,
+ parserMap,
+ objectMapper
+ );
+ }
+
@Override
public String toString()
{
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 13bf27d..83fc9c6 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -52,6 +53,8 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -60,6 +63,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class DataSchemaTest extends InitializedNullHandlingTest
{
@@ -546,4 +550,31 @@ public class DataSchemaTest extends
InitializedNullHandlingTest
Assert.assertEquals(originalSchema.getTransformSpec(),
deserialized.getTransformSpec());
Assert.assertEquals(originalSchema.getParserMap(),
deserialized.getParserMap());
}
+
+ @Test
+ public void testWithDimensionSpec()
+ {
+ TimestampSpec tsSpec = Mockito.mock(TimestampSpec.class);
+ GranularitySpec gSpec = Mockito.mock(GranularitySpec.class);
+ DimensionsSpec oldDimSpec = Mockito.mock(DimensionsSpec.class);
+ DimensionsSpec newDimSpec = Mockito.mock(DimensionsSpec.class);
+ AggregatorFactory aggFactory = Mockito.mock(AggregatorFactory.class);
+ TransformSpec transSpec = Mockito.mock(TransformSpec.class);
+ Map<String, Object> parserMap = Mockito.mock(Map.class);
+
Mockito.when(newDimSpec.withDimensionExclusions(ArgumentMatchers.any(Set.class))).thenReturn(newDimSpec);
+
+ DataSchema oldSchema = new DataSchema("dataSource", tsSpec, oldDimSpec,
+ new AggregatorFactory[]{aggFactory},
gSpec,
+ transSpec, parserMap, jsonMapper
+ );
+ DataSchema newSchema = oldSchema.withDimensionsSpec(newDimSpec);
+ Assert.assertSame(oldSchema.getDataSource(), newSchema.getDataSource());
+ Assert.assertSame(oldSchema.getTimestampSpec(),
newSchema.getTimestampSpec());
+ Assert.assertSame(newDimSpec, newSchema.getDimensionsSpec());
+ Assert.assertSame(oldSchema.getAggregators(), newSchema.getAggregators());
+ Assert.assertSame(oldSchema.getGranularitySpec(),
newSchema.getGranularitySpec());
+ Assert.assertSame(oldSchema.getTransformSpec(),
newSchema.getTransformSpec());
+ Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]