This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6837a7be19 Add logging for downsampling sketches in MSQ (#14580)
6837a7be19 is described below

commit 6837a7be190ab55211b60d38b3a2945aeba1f7b5
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Wed Aug 2 20:07:54 2023 +0530

    Add logging for downsampling sketches in MSQ (#14580)
    
    * Add more logs for downsampling sketches
    
    * Fix builds
    
    * Lower log level
    
    * Add new log message
---
 .../ClusterByStatisticsCollectorImpl.java          | 55 ++++++++++++++++++----
 .../msq/statistics/DelegateOrMinKeyCollector.java  | 19 +++++++-
 .../druid/msq/statistics/DistinctKeyCollector.java | 17 +++++++
 .../apache/druid/msq/statistics/KeyCollector.java  | 10 +++-
 .../statistics/QuantilesSketchKeyCollector.java    | 19 +++++++-
 .../src/components/rule-editor/rule-editor.tsx     |  2 +-
 6 files changed, 108 insertions(+), 14 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
index a4ce744d57..50c0a3ab39 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java
@@ -30,6 +30,8 @@ import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.frame.key.RowKeyReader;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.column.RowSignature;
 
 import java.util.ArrayList;
@@ -48,6 +50,7 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
   // for an objectively faster and more accurate solution instead of finding 
the best match with the following parameters
   private static final int MAX_COUNT_MAX_ITERATIONS = 500;
   private static final double MAX_COUNT_ITERATION_GROWTH_FACTOR = 1.05;
+  private final Logger log = new 
Logger(ClusterByStatisticsCollectorImpl.class);
 
   private final ClusterBy clusterBy;
   private final RowKeyReader keyReader;
@@ -127,6 +130,7 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
 
     totalRetainedBytes += bucketHolder.updateRetainedBytes();
     if (totalRetainedBytes > maxRetainedBytes) {
+      log.debug("Downsampling ClusterByStatisticsCollector as 
totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", 
totalRetainedBytes, maxRetainedBytes);
       downSample();
     }
 
@@ -148,6 +152,7 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
 
         totalRetainedBytes += bucketHolder.updateRetainedBytes();
         if (totalRetainedBytes > maxRetainedBytes) {
+          log.debug("Downsampling ClusterByStatisticsCollector as 
totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", 
totalRetainedBytes, maxRetainedBytes);
           downSample();
         }
       }
@@ -179,6 +184,7 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
 
       totalRetainedBytes += bucketHolder.updateRetainedBytes();
       if (totalRetainedBytes > maxRetainedBytes) {
+        log.debug("Downsampling ClusterByStatisticsCollector as 
totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", 
totalRetainedBytes, maxRetainedBytes);
         downSample();
       }
     }
@@ -227,6 +233,8 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
   @Override
   public ClusterByPartitions generatePartitionsWithTargetWeight(final long 
targetWeight)
   {
+    logSketches();
+
     if (targetWeight < 1) {
       throw new IAE("Target weight must be positive");
     }
@@ -270,6 +278,8 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
   @Override
   public ClusterByPartitions generatePartitionsWithMaxCount(final int 
maxNumPartitions)
   {
+    logSketches();
+
     if (maxNumPartitions < 1) {
       throw new IAE("Must have at least one partition");
     } else if (buckets.isEmpty()) {
@@ -311,6 +321,28 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
     return ranges;
   }
 
+  private void logSketches()
+  {
+    if (log.isDebugEnabled()) {
+      // Log all sketches
+      List<KeyCollector<?>> keyCollectors = buckets.values()
+                                                   .stream()
+                                                   .map(bucketHolder -> 
bucketHolder.keyCollector)
+                                                   
.sorted(Comparator.comparingInt(KeyCollector::sketchAccuracyFactor))
+                                                   
.collect(Collectors.toList());
+      log.debug("KeyCollectors at partition generation: [%s]", keyCollectors);
+    } else {
+      // Log the 5 least accurate sketches
+      List<KeyCollector<?>> limitedKeyCollectors = buckets.values()
+                                                          .stream()
+                                                          .map(bucketHolder -> 
bucketHolder.keyCollector)
+                                                          
.sorted(Comparator.comparingInt(KeyCollector::sketchAccuracyFactor))
+                                                          .limit(5)
+                                                          
.collect(Collectors.toList());
+      log.info("Most downsampled keyCollectors: [%s]", limitedKeyCollectors);
+    }
+  }
+
   @Override
   public ClusterByStatisticsSnapshot snapshot()
   {
@@ -380,32 +412,37 @@ public class ClusterByStatisticsCollectorImpl implements 
ClusterByStatisticsColl
     long newTotalRetainedBytes = totalRetainedBytes;
     final long targetTotalRetainedBytes = totalRetainedBytes / 2;
 
-    final List<BucketHolder> sortedHolders = new ArrayList<>(buckets.size());
+    final List<Pair<Long, BucketHolder>> sortedHolders = new 
ArrayList<>(buckets.size());
+    final RowKeyReader trimmedRowReader = 
keyReader.trimmedKeyReader(clusterBy.getBucketByCount());
 
     // Only consider holders with more than one retained key. Holders with a 
single retained key cannot be downsampled.
-    for (final BucketHolder holder : buckets.values()) {
-      if (holder.keyCollector.estimatedRetainedKeys() > 1) {
-        sortedHolders.add(holder);
+    for (final Map.Entry<RowKey, BucketHolder> entry : buckets.entrySet()) {
+      BucketHolder bucketHolder = entry.getValue();
+      if (bucketHolder != null && 
bucketHolder.keyCollector.estimatedRetainedKeys() > 1) {
+        Long timeChunk = clusterBy.getBucketByCount() == 0 ? null : (Long) 
trimmedRowReader.read(entry.getKey(), 0);
+        sortedHolders.add(Pair.of(timeChunk, bucketHolder));
       }
     }
 
     // Downsample least-dense buckets first. (They're less likely to need high 
resolution.)
     sortedHolders.sort(
-        Comparator.comparing((BucketHolder holder) ->
-                                 (double) 
holder.keyCollector.estimatedTotalWeight()
-                                 / holder.keyCollector.estimatedRetainedKeys())
+        Comparator.comparing((Pair<Long, BucketHolder> pair) ->
+                                 (double) 
pair.rhs.keyCollector.estimatedTotalWeight()
+                                 / 
pair.rhs.keyCollector.estimatedRetainedKeys())
     );
 
     int i = 0;
     while (i < sortedHolders.size() && newTotalRetainedBytes > 
targetTotalRetainedBytes) {
-      final BucketHolder bucketHolder = sortedHolders.get(i);
+      final Long timeChunk = sortedHolders.get(i).lhs;
+      final BucketHolder bucketHolder = sortedHolders.get(i).rhs;
 
       // Ignore false return, because we wrap all collectors in 
DelegateOrMinKeyCollector and can be assured that
       // it will downsample all the way to one if needed. Can't do better than 
that.
+      log.debug("Downsampling sketch for timeChunk [%s]: [%s]", timeChunk, 
bucketHolder.keyCollector);
       bucketHolder.keyCollector.downSample();
       newTotalRetainedBytes += bucketHolder.updateRetainedBytes();
 
-      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 
1).retainedBytes > bucketHolder.retainedBytes || 
bucketHolder.keyCollector.estimatedRetainedKeys() <= 1) {
+      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 
1).rhs.retainedBytes > bucketHolder.retainedBytes || 
bucketHolder.keyCollector.estimatedRetainedKeys() <= 1) {
         i++;
       }
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollector.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollector.java
index e076194f2a..d595ff40ad 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollector.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollector.java
@@ -34,11 +34,11 @@ import java.util.Optional;
 /**
  * Delegates to some other kind of {@link KeyCollector} at first, until its 
{@link #downSample()} fails to downsample.
  * At that point, the delegate collector is nulled out and this collector 
starts tracking the min key instead.
- *
+ * <br>
  * This is useful because it allows us to wrap any {@link KeyCollector} and 
enable downsampling to a single key, even
  * if the original collector does not support that. For example, {@link 
QuantilesSketchKeyCollector} cannot downsample
  * below K = 2, which retains more than one key.
- *
+ * <br>
  * Created by {@link DelegateOrMinKeyCollectorFactory}.
  */
 public class DelegateOrMinKeyCollector<TDelegate extends 
KeyCollector<TDelegate>>
@@ -177,4 +177,19 @@ public class DelegateOrMinKeyCollector<TDelegate extends 
KeyCollector<TDelegate>
       return ClusterByPartitions.oneUniversalPartition();
     }
   }
+
+  @Override
+  public int sketchAccuracyFactor()
+  {
+    return delegate == null ? Integer.MIN_VALUE : 
delegate.sketchAccuracyFactor();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DelegateOrMinKeyCollector{" +
+           "delegate=" + delegate +
+           ", minKey=" + minKey +
+           '}';
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollector.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollector.java
index f88174b333..b22a099033 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollector.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollector.java
@@ -245,6 +245,12 @@ public class DistinctKeyCollector implements 
KeyCollector<DistinctKeyCollector>
     return new ClusterByPartitions(partitions);
   }
 
+  @Override
+  public int sketchAccuracyFactor()
+  {
+    return -spaceReductionFactor;
+  }
+
   @JsonProperty("keys")
   Map<RowKey, Long> getRetainedKeys()
   {
@@ -312,4 +318,15 @@ public class DistinctKeyCollector implements 
KeyCollector<DistinctKeyCollector>
 
     return true;
   }
+
+  @Override
+  public String toString()
+  {
+    return "DistinctKeyCollector{" +
+           "maxBytes=" + maxBytes +
+           ", retainedBytes=" + retainedBytes +
+           ", spaceReductionFactor=" + spaceReductionFactor +
+           ", totalWeightUnadjusted=" + totalWeightUnadjusted +
+           '}';
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollector.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollector.java
index bf059f1996..2f28e148d3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollector.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/KeyCollector.java
@@ -26,7 +26,7 @@ public interface KeyCollector<CollectorType extends 
KeyCollector<CollectorType>>
 {
   /**
    * Add a key with a certain weight to this collector.
-   *
+   * <br>
    * See {@link ClusterByStatisticsCollector#add} for the meaning of "weight".
    */
   void add(RowKey key, long weight);
@@ -80,4 +80,12 @@ public interface KeyCollector<CollectorType extends 
KeyCollector<CollectorType>>
    *                     or lower than the provided target.
    */
   ClusterByPartitions generatePartitionsWithTargetWeight(long targetWeight);
+
+  /**
+   * Returns an integer which indicates the accuracy of the sketch used. The 
higher the value, the more accurate it is.
+   * This can be compared to check which sketches have been downsampled the 
most and are thus the least accurate. The
+   * exact value returned is decided by the implementation, and it is only 
meaningful to compare sketches of the same
+   * implementation in this way.
+   */
+  int sketchAccuracyFactor();
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
index ee08c88d7f..607265367c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java
@@ -37,7 +37,7 @@ import java.util.NoSuchElementException;
 
 /**
  * A key collector that is used when not aggregating. It uses a quantiles 
sketch to track keys.
- *
+ * <br>
  * The collector maintains the averageKeyLength for all keys added through 
{@link #add(RowKey, long)} or
  * {@link #addAll(QuantilesSketchKeyCollector)}. The average is calculated as 
a running average and accounts for
  * weight of the key added. The averageKeyLength is assumed to be unaffected 
by {@link #downSample()}.
@@ -171,6 +171,12 @@ public class QuantilesSketchKeyCollector implements 
KeyCollector<QuantilesSketch
     return new ClusterByPartitions(partitions);
   }
 
+  @Override
+  public int sketchAccuracyFactor()
+  {
+    return sketch.getK();
+  }
+
   /**
    * Retrieves the backing sketch. Exists for usage by {@link 
QuantilesSketchKeyCollectorFactory}.
    */
@@ -186,4 +192,15 @@ public class QuantilesSketchKeyCollector implements 
KeyCollector<QuantilesSketch
   {
     return averageKeyLength;
   }
+
+  @Override
+  public String toString()
+  {
+    return "QuantilesSketchKeyCollector{" +
+           "sketch=ItemsSketch{N=" + sketch.getN() +
+           ", K=" + sketch.getK() +
+           ", retainedKeys=" + sketch.getNumRetained() +
+           "}, averageKeyLength=" + averageKeyLength +
+           '}';
+  }
 }
diff --git a/web-console/src/components/rule-editor/rule-editor.tsx 
b/web-console/src/components/rule-editor/rule-editor.tsx
index 2382e8ba43..52eb5b805c 100644
--- a/web-console/src/components/rule-editor/rule-editor.tsx
+++ b/web-console/src/components/rule-editor/rule-editor.tsx
@@ -81,7 +81,7 @@ export const RuleEditor = React.memo(function 
RuleEditor(props: RuleEditorProps)
     if (!tieredReplicantsList.length) {
       return (
         <FormGroup>
-          There is no historical replication configured, data will not be 
loaded on hisotricals.
+          There is no historical replication configured, data will not be 
loaded on historicals.
         </FormGroup>
       );
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to