This is an automated email from the ASF dual-hosted git repository.

agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c236227905 Deal with potential cardinality estimate being negative and 
add logging to hash determine partitions phase (#12443)
c236227905 is described below

commit c23622790568e1fca2eeaae308903d3199093ece
Author: Agustin Gonzalez <[email protected]>
AuthorDate: Fri May 20 10:51:06 2022 -0700

    Deal with potential cardinality estimate being negative and add logging to 
hash determine partitions phase (#12443)
    
    * Deal with potential cardinality estimate being negative and add logging
    
    * Fix typo in name
    
    * Refine and minimize logging
    
    * Make it info based on code review
    
    * Create a named constant for the magic number
---
 .../parallel/ParallelIndexSupervisorTask.java      | 46 ++++++++++++-
 .../parallel/DimensionCardinalityReportTest.java   | 80 ++++++++++++++++++++++
 2 files changed, 124 insertions(+), 2 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index fe5c9dc838..02425ff91d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -83,6 +83,7 @@ import 
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -129,6 +130,16 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. 
See task logs for details.";
 
+  // Sometimes Druid estimates one shard for hash partitioning despite 
conditions
+  // indicating that there ought to be more than one. We have not been able to
+  // reproduce but looking at the code around where the following constant is 
used one
+  // possibility is that the sketch's estimate is negative. If that case 
happens
+  // code has been added to log it and to set the estimate to the value of the
+  // following constant. It is not necessary to parametize this value since if 
this
+  // happens it is a bug and the new logging may now provide some evidence to 
reproduce
+  // and fix
+  private static final long DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE = 
7L;
+
   private final ParallelIndexIngestionSpec ingestionSchema;
   /**
    * Base name for the {@link SubTaskSpec} ID.
@@ -703,6 +714,10 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
             cardinalityRunner.getReports().values(),
             effectiveMaxRowsPerSegment
         );
+
+        // This is for potential debugging in case we suspect bad estimation 
of cardinalities etc,
+        LOG.debug("intervalToNumShards: %s", intervalToNumShards.toString());
+
       } else {
         intervalToNumShards = CollectionUtils.mapValues(
             mergeCardinalityReports(cardinalityRunner.getReports().values()),
@@ -901,13 +916,40 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   {
     // aggregate all the sub-reports
     Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
+    return computeIntervalToNumShards(maxRowsPerSegment, finalCollectors);
+  }
 
+  @Nonnull
+  @VisibleForTesting
+  static Map<Interval, Integer> computeIntervalToNumShards(
+      int maxRowsPerSegment,
+      Map<Interval, Union> finalCollectors
+  )
+  {
     return CollectionUtils.mapValues(
         finalCollectors,
         union -> {
           final double estimatedCardinality = union.getEstimate();
-          // determine numShards based on maxRowsPerSegment and the cardinality
-          final long estimatedNumShards = Math.round(estimatedCardinality / 
maxRowsPerSegment);
+          final long estimatedNumShards;
+          if (estimatedCardinality <= 0) {
+            estimatedNumShards = 
DEFAULT_NUM_SHARDS_WHEN_ESTIMATE_GOES_NEGATIVE;
+            LOG.warn("Estimated cardinality for union of estimates is zero or 
less: %.2f, setting num shards to %d",
+                     estimatedCardinality, estimatedNumShards
+            );
+          } else {
+            // determine numShards based on maxRowsPerSegment and the 
cardinality
+            estimatedNumShards = Math.round(estimatedCardinality / 
maxRowsPerSegment);
+          }
+          LOG.info("estimatedNumShards %d given estimated cardinality %.2f and 
maxRowsPerSegment %d",
+                    estimatedNumShards, estimatedCardinality, maxRowsPerSegment
+          );
+          // We have seen this before in the wild in situations where more 
shards should have been created,
+          // log it if it happens with some information & context
+          if (estimatedNumShards == 1) {
+            LOG.info("estimatedNumShards is ONE (%d) given estimated 
cardinality %.2f and maxRowsPerSegment %d",
+                      estimatedNumShards, estimatedCardinality, 
maxRowsPerSegment
+            );
+          }
           try {
             return Math.max(Math.toIntExact(estimatedNumShards), 1);
           }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
index 876458d917..0c540fa8a2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
@@ -23,26 +23,38 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.hll.Union;
 import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+
 public class DimensionCardinalityReportTest
 {
   private static final ObjectMapper OBJECT_MAPPER = 
ParallelIndexTestingFactory.createObjectMapper();
 
   private DimensionCardinalityReport target;
 
+  @Rule
+  public final LoggerCaptureRule logger = new 
LoggerCaptureRule(ParallelIndexSupervisorTask.class);
+
+
   @Before
   public void setup()
   {
@@ -293,4 +305,72 @@ public class DimensionCardinalityReportTest
         intervalToNumShards
     );
   }
+
+
+  @Test
+  public void testSupervisorDetermineNegativeNumShardsFromCardinalityReport()
+  {
+    logger.clearLogEvents();
+    Union negativeUnion = mock(Union.class);
+    Mockito.when(negativeUnion.getEstimate()).thenReturn(-1.0);
+    Interval interval = Intervals.of("2001-01-01/P1D");
+    Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, 
negativeUnion);
+    Map<Interval, Integer> intervalToNumShards =
+        ParallelIndexSupervisorTask.computeIntervalToNumShards(10, 
intervalToUnion);
+    Assert.assertEquals(new Integer(7), intervalToNumShards.get(interval));
+
+    List<LogEvent> loggingEvents = logger.getLogEvents();
+    String expectedLogMessage =
+        "Estimated cardinality for union of estimates is zero or less: -1.00, 
setting num shards to 7";
+    Assert.assertTrue(
+        "Logging events: " + loggingEvents,
+        loggingEvents.stream()
+                     .anyMatch(l ->
+                                   l.getLevel().equals(Level.WARN)
+                                   && l.getMessage()
+                                       .getFormattedMessage()
+                                       .equals(expectedLogMessage)
+                     )
+    );
+  }
+
+  @Test
+  public void testSupervisorDeterminePositiveNumShardsFromCardinalityReport()
+  {
+    Union union = mock(Union.class);
+    Mockito.when(union.getEstimate()).thenReturn(24.0);
+    Interval interval = Intervals.of("2001-01-01/P1D");
+    Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, union);
+    Map<Interval, Integer> intervalToNumShards =
+        ParallelIndexSupervisorTask.computeIntervalToNumShards(6, 
intervalToUnion);
+    Assert.assertEquals(new Integer(4), intervalToNumShards.get(interval));
+  }
+
+  @Test
+  public void testSupervisorDeterminePositiveOneShardFromCardinalityReport()
+  {
+    logger.clearLogEvents();
+    Union union = mock(Union.class);
+    Mockito.when(union.getEstimate()).thenReturn(24.0);
+    Interval interval = Intervals.of("2001-01-01/P1D");
+    Map<Interval, Union> intervalToUnion = ImmutableMap.of(interval, union);
+    Map<Interval, Integer> intervalToNumShards =
+        ParallelIndexSupervisorTask.computeIntervalToNumShards(24, 
intervalToUnion);
+    Assert.assertEquals(new Integer(1), intervalToNumShards.get(interval));
+
+    List<LogEvent> loggingEvents = logger.getLogEvents();
+    String expectedLogMessage =
+        "estimatedNumShards is ONE (1) given estimated cardinality 24.00 and 
maxRowsPerSegment 24";
+    Assert.assertTrue(
+        "Logging events: " + loggingEvents,
+        loggingEvents.stream()
+                     .anyMatch(l ->
+                                   l.getLevel().equals(Level.INFO)
+                                   && l.getMessage()
+                                       .getFormattedMessage()
+                                       .equals(expectedLogMessage)
+                     )
+    );
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to