This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9576fd3141e HllSketch Merge Aggregator optimizations (#15162)
9576fd3141e is described below

commit 9576fd3141e81b77e4a8da4bfa6bdecdf0be90b9
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri Nov 3 08:31:22 2023 +0530

    HllSketch Merge Aggregator optimizations (#15162)
    
    * Null byte serde for empty sketches
    
    * Cache for HllSketchMerge
    
    * Check for empty sketches
    
    * Address review comments
    
    * Revert changes to HllSketchHolder
    
    * Handle null sketch holders instead of null sketches
    
    * Add unit test for MSQ HllSketch
    
    * Add comments
    
    * Fix style
---
 .../hll/HllSketchAggregatorFactory.java            |  6 +-
 .../hll/HllSketchHolderObjectStrategy.java         | 67 ++++++++++++++-
 .../hll/HllSketchMergeBufferAggregator.java        | 15 ++--
 .../hll/HllSketchMergeBufferAggregatorHelper.java  | 94 ++++++++++++++++++----
 .../hll/HllSketchMergeVectorAggregator.java        | 18 ++---
 .../hll/HllSketchToEstimatePostAggregator.java     |  6 +-
 ...llSketchToEstimateWithBoundsPostAggregator.java |  6 +-
 .../hll/HllSketchToStringPostAggregator.java       |  6 +-
 .../hll/HllSketchUnionPostAggregator.java          |  7 +-
 .../datasketches/hll/sql/HllPostAggExprMacros.java |  2 +-
 .../hll/HllSketchAggregatorFactoryTest.java        |  2 +-
 .../datasketches/hll/HllSketchAggregatorTest.java  | 20 +++++
 .../apache/druid/msq/exec/MSQDataSketchesTest.java | 46 +++++++++++
 13 files changed, 254 insertions(+), 41 deletions(-)

diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
index 043c977f8be..1c908be244a 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java
@@ -205,10 +205,14 @@ public abstract class HllSketchAggregatorFactory extends 
AggregatorFactory
   @Override
   public Object finalizeComputation(@Nullable final Object object)
   {
-    if (!shouldFinalize || object == null) {
+    if (!shouldFinalize) {
       return object;
     }
 
+    if (object == null) {
+      return 0.0D;
+    }
+
     final HllSketchHolder sketch = HllSketchHolder.fromObj(object);
     final double estimate = sketch.getEstimate();
 
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java
index 21a94ecf31d..18b7f03319e 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolderObjectStrategy.java
@@ -48,13 +48,23 @@ public class HllSketchHolderObjectStrategy implements 
ObjectStrategy<HllSketchHo
   @Override
   public HllSketchHolder fromByteBuffer(final ByteBuffer buf, final int size)
   {
+    if (size == 0 || isSafeToConvertToNullSketch(buf, size)) {
+      return null;
+    }
     return HllSketchHolder.of(HllSketch.wrap(Memory.wrap(buf, 
ByteOrder.LITTLE_ENDIAN).region(buf.position(), size)));
   }
 
   @Override
-  public byte[] toBytes(final HllSketchHolder sketch)
+  public byte[] toBytes(final HllSketchHolder holder)
   {
-    return sketch.getSketch().toCompactByteArray();
+    if (holder == null) {
+      return new byte[] {};
+    }
+    HllSketch sketch = holder.getSketch();
+    if (sketch == null || sketch.isEmpty()) {
+      return new byte[] {};
+    }
+    return sketch.toCompactByteArray();
   }
 
   @Nullable
@@ -67,4 +77,57 @@ public class HllSketchHolderObjectStrategy implements 
ObjectStrategy<HllSketchHo
         )
     );
   }
+
+  /**
+   * Checks if a sketch is empty and can be converted to null. Returns true if 
it is and false if it is not, or if is
+   * not possible to say for sure.
+   * Checks the initial 8 byte header to find the type of internal sketch 
implementation, then uses the logic the
+   * corresponding implementation uses to tell if a sketch is empty while 
deserializing it.
+   */
+  private static boolean isSafeToConvertToNullSketch(ByteBuffer buf, int size)
+  {
+    if (size < 8) {
+      // Sanity check.
+      // HllSketches as bytes should be at least 8 bytes even with an empty 
sketch. If this is not the case, return
+      // false since we can't be sure.
+      return false;
+    }
+    final int position = buf.position();
+
+    // Get preamble int. These should correspond to the type of internal 
implementaion as a sanity check.
+    final int preInts = buf.get(position) & 0x3F;   // get(PREAMBLE_INTS_BYTE) 
& PREAMBLE_MASK
+
+    // Get org.apache.datasketches.hll.CurMode. This indicates the type of 
internal data structure.
+    final int curMode = buf.get(position + 7) & 3;  // get(MODE_BYTE) & 
CUR_MODE_MASK
+    switch (curMode) {
+      case 0: // LIST
+        if (preInts != 2) {
+          // preInts should be LIST_PREINTS, Sanity check.
+          return false;
+        }
+        // Based on org.apache.datasketches.hll.PreambleUtil.extractListCount
+        int listCount = buf.get(position + 6) & 0xFF; // get(LIST_COUNT_BYTE) 
& 0xFF
+        return listCount == 0;
+      case 1: // SET
+        if (preInts != 3 || size < 9) {
+          // preInts should be HASH_SET_PREINTS, Sanity check.
+          // We also need to read an additional byte for Set implementations.
+          return false;
+        }
+        // Based on 
org.apache.datasketches.hll.PreambleUtil.extractHashSetCount
+        int setCount = buf.get(position + 8);  // get(HASH_SET_COUNT_INT)
+        return setCount == 0;
+      case 2: // HLL
+        if (preInts != 10) {
+          // preInts should be HLL_PREINTS, Sanity check.
+          return false;
+        }
+        // Based on org.apache.datasketches.hll.DirectHllArray.isEmpty
+        final int flags = buf.get(position + 5);      // get(FLAGS_BYTE)
+        return (flags & 4) > 0;                       // (flags & 
EMPTY_FLAG_MASK) > 0
+      default: // Unknown implementation
+        // Can't say for sure, so return false.
+        return false;
+    }
+  }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 27920d1b4d1..8e3bfffa073 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -22,13 +22,11 @@ package org.apache.druid.query.aggregation.datasketches.hll;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.datasketches.hll.Union;
-import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
 /**
  * This aggregator merges existing sketches.
@@ -64,10 +62,7 @@ public class HllSketchMergeBufferAggregator implements 
BufferAggregator
       return;
     }
 
-    final WritableMemory mem = WritableMemory.writableWrap(buf, 
ByteOrder.LITTLE_ENDIAN)
-                                             .writableRegion(position, 
helper.getSize());
-
-    final Union union = Union.writableWrap(mem);
+    final Union union = helper.getOrCreateUnion(buf, position);
     union.update(sketch.getSketch());
   }
 
@@ -80,7 +75,7 @@ public class HllSketchMergeBufferAggregator implements 
BufferAggregator
   @Override
   public void close()
   {
-    // nothing to close
+    helper.close();
   }
 
   @Override
@@ -104,4 +99,10 @@ public class HllSketchMergeBufferAggregator implements 
BufferAggregator
     // See https://github.com/apache/druid/pull/6893#discussion_r250726028
     inspector.visit("lgK", helper.getLgK());
   }
+
+  @Override
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, 
ByteBuffer newBuffer)
+  {
+    helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+  }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
index 95d1e1b5bca..22653019772 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.query.aggregation.datasketches.hll;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.datasketches.hll.Union;
@@ -26,15 +28,18 @@ import org.apache.datasketches.memory.WritableMemory;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.IdentityHashMap;
 
 public class HllSketchMergeBufferAggregatorHelper
 {
   private final int lgK;
   private final TgtHllType tgtHllType;
   private final int size;
+  private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new 
IdentityHashMap<>();
+  private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new 
IdentityHashMap<>();
 
   /**
-   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a 
prebuilt empty Union image.
+   * Used by {@link #initializeEmptyUnion(ByteBuffer, int)}. We initialize by 
copying a prebuilt empty Union image.
    * {@link HllSketchBuildBufferAggregator} does something similar, but 
different enough that we don't share code. The
    * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses 
{@link Union} objects.
    */
@@ -57,20 +62,7 @@ public class HllSketchMergeBufferAggregatorHelper
    */
   public void init(final ByteBuffer buf, final int position)
   {
-    // Copy prebuilt empty union object.
-    // Not necessary to cache a Union wrapper around the initialized memory, 
because:
-    //  - It is cheap to reconstruct by re-wrapping the memory in "aggregate" 
and "get".
-    //  - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, 
our Union objects never exceed the
-    //    max size and therefore do not need to be potentially moved in-heap.
-
-    final int oldPosition = buf.position();
-    try {
-      buf.position(position);
-      buf.put(emptyUnion);
-    }
-    finally {
-      buf.position(oldPosition);
-    }
+    createNewUnion(buf, position, false);
   }
 
   /**
@@ -93,4 +85,76 @@ public class HllSketchMergeBufferAggregatorHelper
   {
     return size;
   }
+
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, 
ByteBuffer newBuffer)
+  {
+    createNewUnion(newBuffer, newPosition, true);
+    Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
+    if (unionMap != null) {
+      unionMap.remove(oldPosition);
+      if (unionMap.isEmpty()) {
+        unions.remove(oldBuffer);
+        memCache.remove(oldBuffer);
+      }
+    }
+  }
+
+  public Union getOrCreateUnion(ByteBuffer buf, int position)
+  {
+    Int2ObjectMap<Union> unionMap = unions.get(buf);
+    Union union = unionMap != null ? unionMap.get(position) : null;
+    if (union != null) {
+      return union;
+    }
+    return createNewUnion(buf, position, true);
+  }
+
+  private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
+  {
+    if (!isWrapped) {
+      initializeEmptyUnion(buf, position);
+    }
+
+    final WritableMemory mem = getMemory(buf).writableRegion(position, size);
+    Union union = Union.writableWrap(mem);
+
+    Int2ObjectMap<Union> unionMap = unions.get(buf);
+    if (unionMap == null) {
+      unionMap = new Int2ObjectOpenHashMap<>();
+      unions.put(buf, unionMap);
+    }
+    unionMap.put(position, union);
+    return union;
+  }
+
+  /**
+   * Copy prebuilt empty union object into the specified buffer and position
+   */
+  private void initializeEmptyUnion(ByteBuffer buf, int position)
+  {
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      buf.put(emptyUnion);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
+  }
+
+  public void close()
+  {
+    unions.clear();
+    memCache.clear();
+  }
+
+  private WritableMemory getMemory(ByteBuffer buffer)
+  {
+    WritableMemory mem = memCache.get(buffer);
+    if (mem == null) {
+      mem = WritableMemory.writableWrap(buffer, ByteOrder.LITTLE_ENDIAN);
+      memCache.put(buffer, mem);
+    }
+    return mem;
+  }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
index 8c7e214aa98..5fec9b94ba2 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation.datasketches.hll;
 
 import org.apache.datasketches.hll.TgtHllType;
 import org.apache.datasketches.hll.Union;
-import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.query.aggregation.VectorAggregator;
 import 
org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
 import org.apache.druid.segment.ColumnProcessors;
@@ -29,7 +28,6 @@ import 
org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.function.Supplier;
 
 public class HllSketchMergeVectorAggregator implements VectorAggregator
@@ -65,10 +63,7 @@ public class HllSketchMergeVectorAggregator implements 
VectorAggregator
   {
     final Object[] vector = objectSupplier.get();
 
-    final WritableMemory mem = WritableMemory.writableWrap(buf, 
ByteOrder.LITTLE_ENDIAN)
-                                             .writableRegion(position, 
helper.getSize());
-
-    final Union union = Union.writableWrap(mem);
+    final Union union = helper.getOrCreateUnion(buf, position);
     for (int i = startRow; i < endRow; i++) {
       if (vector[i] != null) {
         union.update(((HllSketchHolder) vector[i]).getSketch());
@@ -85,7 +80,6 @@ public class HllSketchMergeVectorAggregator implements 
VectorAggregator
       final int positionOffset
   )
   {
-    final WritableMemory mem = WritableMemory.writableWrap(buf, 
ByteOrder.LITTLE_ENDIAN);
     final Object[] vector = objectSupplier.get();
 
     for (int i = 0; i < numRows; i++) {
@@ -93,7 +87,7 @@ public class HllSketchMergeVectorAggregator implements 
VectorAggregator
 
       if (o != null) {
         final int position = positions[i] + positionOffset;
-        final Union union = Union.writableWrap(mem.writableRegion(position, 
helper.getSize()));
+        final Union union = helper.getOrCreateUnion(buf, position);
         union.update(o.getSketch());
       }
     }
@@ -108,6 +102,12 @@ public class HllSketchMergeVectorAggregator implements 
VectorAggregator
   @Override
   public void close()
   {
-    // Nothing to close.
+    helper.close();
+  }
+
+  @Override
+  public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, 
ByteBuffer newBuffer)
+  {
+    helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
   }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java
index cbbd389c6f8..0ce05ba7e21 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregator.java
@@ -97,7 +97,11 @@ public class HllSketchToEstimatePostAggregator implements 
PostAggregator
   @Override
   public Object compute(final Map<String, Object> combinedAggregators)
   {
-    final HllSketchHolder holder = 
HllSketchHolder.fromObj(field.compute(combinedAggregators));
+    Object hllSketchHolderObject = field.compute(combinedAggregators);
+    if (hllSketchHolderObject == null) {
+      return 0.0D;
+    }
+    final HllSketchHolder holder = 
HllSketchHolder.fromObj(hllSketchHolderObject);
     // The union object always uses an HLL_8 sketch, so we always get that.  
The target type doesn't actually impact
     // the estimate anyway, so whatever gives us the "cheapest" operation 
should be good.
     double estimate = holder.getEstimate();
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java
index 8e85e18ab5d..0d7c2e3b90b 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimateWithBoundsPostAggregator.java
@@ -103,7 +103,11 @@ public class HllSketchToEstimateWithBoundsPostAggregator 
implements PostAggregat
   @Override
   public double[] compute(final Map<String, Object> combinedAggregators)
   {
-    final HllSketchHolder sketch = 
HllSketchHolder.fromObj(field.compute(combinedAggregators));
+    Object hllSketchHolderObject = field.compute(combinedAggregators);
+    if (hllSketchHolderObject == null) {
+      return new double[] {0.0D, 0.0D, 0.0D};
+    }
+    final HllSketchHolder sketch = 
HllSketchHolder.fromObj(hllSketchHolderObject);
     return new double[] {sketch.getEstimate(), 
sketch.getLowerBound(numStdDevs), sketch.getUpperBound(numStdDevs)};
   }
 
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java
index 2f6d9a991d9..e8e43bf4dd4 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToStringPostAggregator.java
@@ -83,7 +83,11 @@ public class HllSketchToStringPostAggregator implements 
PostAggregator
   @Override
   public String compute(final Map<String, Object> combinedAggregators)
   {
-    final HllSketch sketch = 
HllSketchHolder.fromObj(field.compute(combinedAggregators)).getSketch();
+    Object hllSketchHolderObject = field.compute(combinedAggregators);
+    if (hllSketchHolderObject == null) {
+      return "Null Sketch";
+    }
+    final HllSketch sketch = 
HllSketchHolder.fromObj(hllSketchHolderObject).getSketch();
     return sketch.toString();
   }
 
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java
index 9d96e31cdfe..2ce213520d3 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchUnionPostAggregator.java
@@ -121,8 +121,11 @@ public class HllSketchUnionPostAggregator implements 
PostAggregator
   {
     final Union union = new Union(lgK);
     for (final PostAggregator field : fields) {
-      final HllSketchHolder sketch = 
HllSketchHolder.fromObj(field.compute(combinedAggregators));
-      union.update(sketch.getSketch());
+      Object hllSketchHolderObject = field.compute(combinedAggregators);
+      if (hllSketchHolderObject != null) {
+        final HllSketchHolder holder = 
HllSketchHolder.fromObj(hllSketchHolderObject);
+        union.update(holder.getSketch());
+      }
     }
     return HllSketchHolder.of(union.getResult(tgtHllType));
   }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java
index 8e310d19d13..7786f8be98d 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllPostAggExprMacros.java
@@ -82,7 +82,7 @@ public class HllPostAggExprMacros
 
       final Object valObj = eval.value();
       if (valObj == null) {
-        return ExprEval.of(null);
+        return ExprEval.of(0.0D);
       }
       HllSketchHolder h = HllSketchHolder.fromObj(valObj);
       double estimate = h.getEstimate();
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java
index 53b0ddbdf89..09a20ab8213 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java
@@ -82,7 +82,7 @@ public class HllSketchAggregatorFactoryTest
   @Test
   public void testFinalizeComputationNull()
   {
-    Assert.assertNull(target.finalizeComputation(null));
+    Assert.assertEquals(0.0D, target.finalizeComputation(null));
   }
 
   @Test
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
index 0cd4d8cf39b..7c218934034 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.MapBasedRow;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringEncoding;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -36,6 +38,8 @@ import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.ResultRow;
+import 
org.apache.druid.query.groupby.epinephelinae.GroupByTestColumnSelectorFactory;
+import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
@@ -417,6 +421,22 @@ public class HllSketchAggregatorTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(expectedSummary, ((HllSketchHolder) 
row.get(4)).getSketch().toString());
   }
 
+  @Test
+  public void testRelocation()
+  {
+    final GroupByTestColumnSelectorFactory columnSelectorFactory = 
GrouperTestUtil.newColumnSelectorFactory();
+    HllSketchHolder sketchHolder = new HllSketchHolder(null, new HllSketch());
+    sketchHolder.getSketch().update(1);
+
+    columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", 
sketchHolder)));
+    HllSketchHolder[] holders = groupByHelper.runRelocateVerificationTest(
+        new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, 
null, true, true),
+        columnSelectorFactory,
+        HllSketchHolder.class
+    );
+    Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0);
+  }
+
   private static String buildParserJson(List<String> dimensions, List<String> 
columns)
   {
     Map<String, Object> timestampSpec = ImmutableMap.of(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java
index 716239ac056..1f856027de3 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQDataSketchesTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
 import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.groupby.GroupByQuery;
@@ -95,4 +96,49 @@ public class MSQDataSketchesTest extends MSQTestBase
         )
         .verifyResults();
   }
+
+  @Test
+  public void testEmptyHllSketch()
+  {
+    RowSignature resultSignature =
+        RowSignature.builder()
+                    .add("c", ColumnType.LONG)
+                    .build();
+
+    GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource(CalciteTests.DATASOURCE1)
+                    .setInterval(querySegmentSpec(Filtration.eternity()))
+                    .setGranularity(Granularities.ALL)
+                    .setAggregatorSpecs(
+                        aggregators(
+                            new FilteredAggregatorFactory(
+                                new HllSketchBuildAggregatorFactory("a0", 
"dim2", 12, "HLL_4", null, true, true),
+                                equality("dim1", "nonexistent", 
ColumnType.STRING),
+                                "a0"
+                            )
+                        )
+                    )
+                    .setContext(DEFAULT_MSQ_CONTEXT)
+                    .build();
+
+    testSelectQuery()
+        .setSql("SELECT APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim1 = 
'nonexistent') AS c FROM druid.foo")
+        .setExpectedMSQSpec(MSQSpec.builder()
+                                   .query(query)
+                                   .columnMappings(new 
ColumnMappings(ImmutableList.of(
+                                       new ColumnMapping("a0", "c"))
+                                   ))
+                                   
.tuningConfig(MSQTuningConfig.defaultConfig())
+                                   
.destination(TaskReportMSQDestination.INSTANCE)
+                                   .build())
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
+        .setExpectedRowSignature(resultSignature)
+        .setExpectedResultRows(
+            ImmutableList.of(
+                new Object[]{0L}
+            )
+        )
+        .verifyResults();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to