This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0080e33  Fix cardinality estimation (#10762)
0080e33 is described below

commit 0080e333cc193ef538b57322f2cf256866ed7f66
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Fri Jan 29 04:36:10 2021 +0530

    Fix cardinality estimation (#10762)
    
    * Fix cardinality estimation
    
    * Add unit test
    
    * code coverage
    
    * fix typo
---
 .../parallel/PartialDimensionCardinalityTask.java  | 17 ++++-----
 .../parallel/ParallelIndexTestingFactory.java      | 13 +++++++
 .../PartialDimensionCardinalityTaskTest.java       | 40 ++++++++++++++++++++++
 .../apache/druid/segment/indexing/DataSchema.java  | 14 ++++++++
 .../druid/segment/indexing/DataSchemaTest.java     | 31 +++++++++++++++++
 5 files changed, 104 insertions(+), 11 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 48be3e7..d39822e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -44,12 +44,12 @@ import 
org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.HashPartitioner;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -146,11 +146,6 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
     HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) 
tuningConfig.getPartitionsSpec();
     Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in 
tuningConfig");
 
-    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
-    if (partitionDimensions == null) {
-      partitionDimensions = 
HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
-    }
-
     InputSource inputSource = 
ingestionSchema.getIOConfig().getNonNullInputSource(
         ingestionSchema.getDataSchema().getParser()
     );
@@ -179,8 +174,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
     ) {
       Map<Interval, byte[]> cardinalities = determineCardinalities(
           inputRowIterator,
-          granularitySpec,
-          partitionDimensions
+          granularitySpec
       );
 
       sendReport(
@@ -194,8 +188,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
 
   private Map<Interval, byte[]> determineCardinalities(
       CloseableIterator<InputRow> inputRowIterator,
-      GranularitySpec granularitySpec,
-      List<String> partitionDimensions
+      GranularitySpec granularitySpec
   )
   {
     Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
@@ -218,8 +211,10 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
           interval,
           (intervalKey) -> 
DimensionCardinalityReport.createHllSketchForReport()
       );
+      // For cardinality estimation, we want to consider unique rows instead 
of unique hash buckets and therefore
+      // we do not use partition dimensions in computing the group key
       List<Object> groupKey = HashPartitioner.extractKeys(
-          partitionDimensions,
+          Collections.emptyList(),
           queryGranularity.bucketStart(timestamp).getMillis(),
           inputRow
       );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index e663022..0e6460d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -53,6 +53,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -277,6 +278,18 @@ class ParallelIndexTestingFactory
     }
   }
 
+  static String createRowFromMap(long timestamp, Map<String, Object> fields)
+  {
+    HashMap<String, Object> row = new HashMap<>(fields);
+    row.put(SCHEMA_TIME, timestamp);
+    try {
+      return NESTED_OBJECT_MAPPER.writeValueAsString(row);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   static InputFormat getInputFormat()
   {
     return new JsonInputFormat(null, null, null);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index 0d65f86..835172d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -21,12 +21,14 @@ package 
org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.memory.Memory;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -265,6 +267,38 @@ public class PartialDimensionCardinalityTaskTest
     }
 
     @Test
+    public void sendsCorrectReportWhenNonEmptyPartitionDimension()
+    {
+      InputSource inlineInputSource = new InlineInputSource(
+          ParallelIndexTestingFactory.createRowFromMap(0, 
ImmutableMap.of("dim1", "a", "dim2", "1")) + "\n" +
+          ParallelIndexTestingFactory.createRowFromMap(0, 
ImmutableMap.of("dim1", "a", "dim2", "2")) + "\n" +
+          ParallelIndexTestingFactory.createRowFromMap(0, 
ImmutableMap.of("dim1", "b", "dim2", "3")) + "\n" +
+          ParallelIndexTestingFactory.createRowFromMap(0, 
ImmutableMap.of("dim1", "b", "dim2", "4"))
+      );
+      HashedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(null, 
null,
+                                                                     
Collections.singletonList("dim1")
+      );
+      ParallelIndexTuningConfig tuningConfig = new 
ParallelIndexTestingFactory.TuningConfigBuilder()
+          .partitionsSpec(partitionsSpec)
+          .build();
+
+      PartialDimensionCardinalityTaskBuilder taskBuilder = new 
PartialDimensionCardinalityTaskBuilder()
+          .inputSource(inlineInputSource)
+          .tuningConfig(tuningConfig)
+          .withDimensions(Arrays.asList("dim1", "dim2"));
+
+      DimensionCardinalityReport report = runTask(taskBuilder);
+
+      Assert.assertEquals(ParallelIndexTestingFactory.ID, report.getTaskId());
+      Map<Interval, byte[]> intervalToCardinalities = 
report.getIntervalToCardinalities();
+      byte[] hllSketchBytes = 
Iterables.getOnlyElement(intervalToCardinalities.values());
+      HllSketch hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
+      Assert.assertNotNull(hllSketch);
+      Assert.assertEquals(4L, (long) hllSketch.getEstimate());
+
+    }
+
+    @Test
     public void sendsCorrectReportWithMultipleIntervalsInData()
     {
       // Segment granularity is DAY, query granularity is HOUR
@@ -368,6 +402,12 @@ public class PartialDimensionCardinalityTaskTest
       return this;
     }
 
+    PartialDimensionCardinalityTaskBuilder withDimensions(List<String> dims)
+    {
+      this.dataSchema = dataSchema.withDimensionsSpec(new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims)));
+      return this;
+    }
+
 
     PartialDimensionCardinalityTask build()
     {
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java 
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 3fc0fa2..f36f754 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -300,6 +300,20 @@ public class DataSchema
     );
   }
 
+  public DataSchema withDimensionsSpec(DimensionsSpec dimensionsSpec)
+  {
+    return new DataSchema(
+        dataSource,
+        timestampSpec,
+        dimensionsSpec,
+        aggregators,
+        granularitySpec,
+        transformSpec,
+        parserMap,
+        objectMapper
+    );
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 13bf27d..83fc9c6 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -52,6 +53,8 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -60,6 +63,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class DataSchemaTest extends InitializedNullHandlingTest
 {
@@ -546,4 +550,31 @@ public class DataSchemaTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(originalSchema.getTransformSpec(), 
deserialized.getTransformSpec());
     Assert.assertEquals(originalSchema.getParserMap(), 
deserialized.getParserMap());
   }
+
+  @Test
+  public void testWithDimensionSpec()
+  {
+    TimestampSpec tsSpec = Mockito.mock(TimestampSpec.class);
+    GranularitySpec gSpec = Mockito.mock(GranularitySpec.class);
+    DimensionsSpec oldDimSpec = Mockito.mock(DimensionsSpec.class);
+    DimensionsSpec newDimSpec = Mockito.mock(DimensionsSpec.class);
+    AggregatorFactory aggFactory = Mockito.mock(AggregatorFactory.class);
+    TransformSpec transSpec = Mockito.mock(TransformSpec.class);
+    Map<String, Object> parserMap = Mockito.mock(Map.class);
+    
Mockito.when(newDimSpec.withDimensionExclusions(ArgumentMatchers.any(Set.class))).thenReturn(newDimSpec);
+
+    DataSchema oldSchema = new DataSchema("dataSource", tsSpec, oldDimSpec,
+                                          new AggregatorFactory[]{aggFactory}, 
gSpec,
+                                          transSpec, parserMap, jsonMapper
+    );
+    DataSchema newSchema = oldSchema.withDimensionsSpec(newDimSpec);
+    Assert.assertSame(oldSchema.getDataSource(), newSchema.getDataSource());
+    Assert.assertSame(oldSchema.getTimestampSpec(), 
newSchema.getTimestampSpec());
+    Assert.assertSame(newDimSpec, newSchema.getDimensionsSpec());
+    Assert.assertSame(oldSchema.getAggregators(), newSchema.getAggregators());
+    Assert.assertSame(oldSchema.getGranularitySpec(), 
newSchema.getGranularitySpec());
+    Assert.assertSame(oldSchema.getTransformSpec(), 
newSchema.getTransformSpec());
+    Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to