This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e44e35d657b Support AVG aggregation in MergeRollupTask and 
RealtimeToOfflineSegmentsTask (#18822)
e44e35d657b is described below

commit e44e35d657b2c605b83dec8e2401d0bbb153f75c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Jun 22 23:55:04 2026 -0700

    Support AVG aggregation in MergeRollupTask and 
RealtimeToOfflineSegmentsTask (#18822)
    
    * Support AVG aggregation in MergeRollupTask and 
RealtimeToOfflineSegmentsTask
    
    AVG is computed from a serialized AvgPair (sum + count) stored in a BYTES
    column produced by an AVG ingestion aggregation or star-tree. Merging adds
    the sums and counts so the average stays correct across multiple rollup
    levels; averaging already-averaged scalars would be wrong for groups with
    unequal counts. The query-time AvgAggregationFunction already reads the same
    serialized AvgPair format, so no query-path change is needed.
    
    - Add AvgValueAggregator to the segment-processing rollup reducer
    - Register AVG in the pinot-core processing ValueAggregatorFactory
    - Allow AVG in 
MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS
    - Add unit + executor tests for both MergeRollup and RealtimeToOffline,
      covering unequal counts and two-level rollup
    
    * Validate bytes-backed rollup aggregation columns are declared as BYTES
    
    The merge-rollup and realtime-to-offline task generators previously 
validated
    the configured aggregationType only against 
AVAILABLE_CORE_VALUE_AGGREGATORS,
    not the target column's dataType. A bytes-backed aggregation (AVG, the 
sketch
    types, TDigest, KLL) configured on a non-BYTES column passed config 
validation
    but failed with a ClassCastException at task runtime.
    
    Add ValueAggregatorFactory.isBytesBacked() (co-located with the aggregator
    switch as a single source of truth) and 
MergeTaskUtils.validateAggregationColumnType(),
    called from both generators, to reject such configs at config time with a 
clear
    message, uniformly for all bytes-backed types.
    
    * Emit a valid empty AvgPair instead of byte[0] when merging two empty 
values
    
    When both inputs to the merge-rollup AvgValueAggregator are empty (the 
default
    null value for a BYTES column), it previously returned an empty byte[], 
which
    would be written into the merged segment and then fail AvgPair 
deserialization
    (expects 16 bytes) in the query-time AVG function. Return a serialized empty
    AvgPair (sum=0, count=0) instead, which the query path handles as a no-data 
row.
---
 .../apache/pinot/core/common/MinionConstants.java  |   2 +-
 .../processing/aggregator/AvgValueAggregator.java  |  62 ++++
 .../aggregator/ValueAggregatorFactory.java         |  35 ++-
 .../aggregator/AvgValueAggregatorTest.java         | 114 +++++++
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  |  19 ++
 .../mergerollup/MergeRollupTaskGenerator.java      |   1 +
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   1 +
 .../MergeRollupAvgTaskExecutorTest.java            | 330 +++++++++++++++++++++
 .../mergerollup/MergeRollupTaskGeneratorTest.java  |  40 ++-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |  94 ++++++
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java |  46 ++-
 11 files changed, 738 insertions(+), 6 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 4ae4f8baf1b..976216d06a9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -173,7 +173,7 @@ public class MinionConstants {
     /// with RealtimeToOfflineSegmentsTask, which performs the same rollup 
aggregation when configured with rollup
     /// merge type.
     public static final EnumSet<AggregationFunctionType> 
AVAILABLE_CORE_VALUE_AGGREGATORS =
-        EnumSet.of(MIN, MAX, SUM, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
+        EnumSet.of(MIN, MAX, SUM, AVG, DISTINCTCOUNTHLL, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTTHETASKETCH,
             DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, 
DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
             SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, 
DISTINCTCOUNTHLLPLUS,
             DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, 
DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
new file mode 100644
index 00000000000..762080f6c6f
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+
+
+/// Aggregator for merging serialized [AvgPair] (sum + count) values during 
segment processing
+/// (e.g. MergeRollupTask / RealtimeToOfflineSegmentsTask).
+///
+/// The column must store a serialized `AvgPair`, i.e. a `BYTES` column 
produced by an `AVG` ingestion
+/// aggregation (or star-tree). Merging adds the sums and counts so the 
average stays correct across
+/// multiple rollup levels — averaging already-averaged scalars would be 
wrong. The final `sum / count`
+/// division happens at query time in `AvgAggregationFunction`, which already 
reads the same serialized
+/// `AvgPair` format.
+public class AvgValueAggregator implements ValueAggregator {
+
+  /// Merges two serialized `AvgPair` values into one. An empty `byte[]` (the 
default null value for a `BYTES`
+  /// column) is treated as a missing value, so the other side is returned 
unchanged; if both are empty, a serialized
+  /// empty `AvgPair` (sum=0, count=0) is returned rather than an empty 
`byte[]`, which `AvgPair` deserialization
+  /// (16 bytes) cannot read.
+  @Override
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
+    byte[] bytes1 = (byte[]) value1;
+    byte[] bytes2 = (byte[]) value2;
+
+    // Treat empty byte arrays (default null value for BYTES columns) as 
missing values. When both sides are empty,
+    // emit a valid serialized empty AvgPair rather than propagating an empty 
byte[] that cannot be deserialized.
+    if (bytes1.length == 0 && bytes2.length == 0) {
+      return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair());
+    }
+    if (bytes1.length == 0) {
+      return bytes2;
+    }
+    if (bytes2.length == 0) {
+      return bytes1;
+    }
+
+    AvgPair first = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes1);
+    AvgPair second = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes2);
+    first.apply(second);
+    return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(first);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 36dbc1e7ff2..1d5aa5e3978 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -39,11 +39,42 @@ public class ValueAggregatorFactory {
         || aggregationType == AggregationFunctionType.LASTWITHTIME;
   }
 
+  /// Returns `true` if the rollup ValueAggregator for the given type stores 
its aggregated value as a serialized
+  /// object (sketch, AvgPair, TDigest, ...) and therefore reads the metric 
column value as a `byte[]`, i.e. the
+  /// column must be a `BYTES` column. Numeric (MIN/MAX/SUM) and time-ordered 
(FIRSTWITHTIME/LASTWITHTIME) aggregators
+  /// keep the column's native type. Keep this in sync with 
[#getValueAggregator]: a type is bytes-backed iff its
+  /// aggregator deserializes the column value as `byte[]`.
+  public static boolean isBytesBacked(AggregationFunctionType aggregationType) 
{
+    switch (aggregationType) {
+      case AVG:
+      case DISTINCTCOUNTHLL:
+      case DISTINCTCOUNTRAWHLL:
+      case DISTINCTCOUNTTHETASKETCH:
+      case DISTINCTCOUNTRAWTHETASKETCH:
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case DISTINCTCOUNTCPCSKETCH:
+      case DISTINCTCOUNTRAWCPCSKETCH:
+      case DISTINCTCOUNTULL:
+      case DISTINCTCOUNTRAWULL:
+      case PERCENTILEKLL:
+      case PERCENTILERAWKLL:
+      case PERCENTILETDIGEST:
+      case PERCENTILERAWTDIGEST:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   /// Constructs a ValueAggregator from the given aggregation type.
   ///
   /// When adding entries to this please add them to the Set named 
AVAILABLE_CORE_VALUE_AGGREGATORS in
   /// org.apache.pinot.core.common.MinionConstants.MergeRollupTask so that 
they pass the task config validation of the
-  /// merge tasks (MergeRollupTask, RealtimeToOfflineSegmentsTask)
+  /// merge tasks (MergeRollupTask, RealtimeToOfflineSegmentsTask), and update 
[#isBytesBacked] if the new aggregator
+  /// reads/writes the column value as `byte[]`.
   public static ValueAggregator getValueAggregator(AggregationFunctionType 
aggregationType, DataType dataType) {
     switch (aggregationType) {
       case MIN:
@@ -52,6 +83,8 @@ public class ValueAggregatorFactory {
         return new MaxValueAggregator(dataType);
       case SUM:
         return new SumValueAggregator(dataType);
+      case AVG:
+        return new AvgValueAggregator();
       case DISTINCTCOUNTHLL:
       case DISTINCTCOUNTRAWHLL:
         return new DistinctCountHLLAggregator();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
new file mode 100644
index 00000000000..b72e277493c
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class AvgValueAggregatorTest {
+
+  private final Map<String, String> _functionParameters = new HashMap<>();
+  private AvgValueAggregator _aggregator;
+
+  @BeforeMethod
+  public void setUp() {
+    _aggregator = new AvgValueAggregator();
+  }
+
+  private static byte[] serialize(double sum, long count) {
+    return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(new AvgPair(sum, count));
+  }
+
+  private AvgPair aggregate(byte[] value1, byte[] value2) {
+    byte[] result = (byte[]) _aggregator.aggregate(value1, value2, 
_functionParameters);
+    return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result);
+  }
+
+  @Test
+  public void testAggregateAddsSumAndCount() {
+    // (sum=30, count=2) avg 15 merged with (sum=20, count=4) avg 5 -> total 
sum 50, total count 6, avg 8.33...
+    AvgPair merged = aggregate(serialize(30.0, 2L), serialize(20.0, 4L));
+    assertEquals(merged.getSum(), 50.0);
+    assertEquals(merged.getCount(), 6L);
+  }
+
+  @Test
+  public void testTwoLevelRollupPreservesSumAndCount() {
+    // The average-of-averages trap: a second rollup pass over 
already-rolled-up rows must keep the
+    // running sum/count, not re-average the level-1 averages.
+    // Level-1 group A: 50 rows of value 2 -> (sum=100, count=50), avg 2
+    // Level-1 group B: 50 rows of value 4 -> (sum=200, count=50), avg 4
+    byte[] level1A = serialize(100.0, 50L);
+    byte[] level1B = serialize(200.0, 50L);
+
+    AvgPair level2 = aggregate(level1A, level1B);
+    assertEquals(level2.getSum(), 300.0);
+    assertEquals(level2.getCount(), 100L);
+    // Correct overall average is 3.0 (300/100), NOT 3.0 by accident of 
(2+4)/2 — verify via the pair.
+    assertEquals(level2.getSum() / level2.getCount(), 3.0);
+  }
+
+  @Test
+  public void testAggregateWithBothEmptyBytes() {
+    byte[] result = (byte[]) _aggregator.aggregate(new byte[0], new byte[0], 
_functionParameters);
+    // Both empty (default null value for BYTES columns) -> a valid serialized 
empty AvgPair (sum=0, count=0), not an
+    // empty byte[] that AvgPair deserialization (and the query-time AVG 
function) cannot read.
+    AvgPair merged = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(result);
+    assertEquals(merged.getSum(), 0.0);
+    assertEquals(merged.getCount(), 0L);
+  }
+
+  @Test
+  public void testAggregateWithFirstEmptyBytes() {
+    byte[] value2 = serialize(42.0, 7L);
+    byte[] result = (byte[]) _aggregator.aggregate(new byte[0], value2, 
_functionParameters);
+    // Should return the non-empty side as-is
+    assertEquals(result, value2);
+  }
+
+  @Test
+  public void testAggregateWithSecondEmptyBytes() {
+    byte[] value1 = serialize(42.0, 7L);
+    byte[] result = (byte[]) _aggregator.aggregate(value1, new byte[0], 
_functionParameters);
+    // Should return the non-empty side as-is
+    assertEquals(result, value1);
+  }
+
+  @Test(expectedExceptions = java.nio.BufferUnderflowException.class)
+  public void testAggregateRejectsMalformedBytes() {
+    // A non-empty but malformed (too short) AvgPair buffer must fail loudly 
rather than silently corrupt.
+    _aggregator.aggregate(serialize(1.0, 1L), new byte[]{1, 2, 3}, 
_functionParameters);
+  }
+
+  @Test
+  public void testFactoryReturnsAvgAggregator() {
+    ValueAggregator aggregator = 
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.AVG, 
DataType.BYTES);
+    assertTrue(aggregator instanceof AvgValueAggregator);
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 13b4067761a..220269db7bd 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -166,6 +166,25 @@ public class MergeTaskUtils {
         aggregationType, column, timeColumn);
   }
 
+  /// Validates that a column configured with a bytes-backed rollup 
aggregation (sketches, AvgPair, TDigest, ...) is
+  /// declared as a `BYTES` column in the schema. These aggregators read the 
stored column value as a serialized
+  /// object, so a non-BYTES column would fail with a ClassCastException at 
task runtime. No-op for other (numeric or
+  /// time-ordered) aggregation types. The given aggregation type must be 
parseable (see
+  /// [AggregationFunctionType#getAggregationFunctionType(String)]).
+  public static void validateAggregationColumnType(Schema schema, String 
column, String aggregationType) {
+    AggregationFunctionType aggregationFunctionType = 
AggregationFunctionType.getAggregationFunctionType(
+        aggregationType);
+    if (!ValueAggregatorFactory.isBytesBacked(aggregationFunctionType)) {
+      return;
+    }
+    FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+    Preconditions.checkState(fieldSpec != null,
+        "Aggregation type: %s on column: %s requires the column to exist in 
schema!", aggregationType, column);
+    Preconditions.checkState(fieldSpec.getDataType() == 
FieldSpec.DataType.BYTES,
+        "Aggregation type: %s on column: %s requires the column to be of type 
BYTES in schema, but found: %s",
+        aggregationType, column, fieldSpec.getDataType());
+  }
+
   /**
    * Returns the segment config based on the task config.
    * TODO - Ensure all tasks that build SegmentConfig use this method so that 
all appropriate configs are set.
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 47c92a9f806..839d5823666 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -528,6 +528,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
               "Invalid aggregation type: " + entry.getValue() + " for column: 
" + column, e);
         }
         MergeTaskUtils.validateOrderSensitiveAggregation(tableConfig, schema, 
column, entry.getValue());
+        MergeTaskUtils.validateAggregationColumnType(schema, column, 
entry.getValue());
       }
     }
     // check no mis-configured aggregation function parameters
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c6631dccc3c..2240974cec1 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -354,6 +354,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
           throw new IllegalStateException(err, e);
         }
         MergeTaskUtils.validateOrderSensitiveAggregation(tableConfig, schema, 
column, entry.getValue());
+        MergeTaskUtils.validateAggregationColumnType(schema, column, 
entry.getValue());
       }
     }
   }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
new file mode 100644
index 00000000000..0443e95eb85
--- /dev/null
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks.mergerollup;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
+import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskTestUtils;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests MergeRollup task executor with AVG aggregation on a {@code BYTES} 
column holding serialized
+ * {@link AvgPair} (sum + count) values.
+ * <p>
+ * AVG is correct across rollups only because the (sum, count) pair is 
preserved and merged additively;
+ * averaging already-averaged scalars would be wrong whenever groups have 
unequal counts. The tests below
+ * deliberately use unequal counts so an average-of-averages implementation 
would fail them.
+ */
+public class MergeRollupAvgTaskExecutorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"MergeRollupAvgTaskExecutorTest");
+  private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, 
"originalSegment");
+  private static final String TABLE_NAME = "avg_table";
+  private static final String DIMENSION_COL = "groupKey";
+  private static final String AVG_COL = "avg_metric";
+
+  private static final String GROUP_1 = "group1";
+  private static final String GROUP_2 = "group2";
+  private static final String GROUP_3 = "group3";
+
+  private TableConfig _tableConfig;
+  private Schema _schema;
+  private int _workingDirCounter = 0;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    _schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(DIMENSION_COL, FieldSpec.DataType.STRING)
+        .addMetric(AVG_COL, FieldSpec.DataType.BYTES)
+        .build();
+
+    MinionContext minionContext = MinionContext.getInstance();
+    //noinspection unchecked
+    ZkHelixPropertyStore<ZNRecord> helixPropertyStore = 
Mockito.mock(ZkHelixPropertyStore.class);
+    Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/" + TABLE_NAME + 
"_OFFLINE", null, AccessOption.PERSISTENT))
+        .thenReturn(TableConfigSerDeUtils.toZNRecord(_tableConfig));
+    Mockito.when(helixPropertyStore.get("/SCHEMAS/" + TABLE_NAME, null, 
AccessOption.PERSISTENT))
+        .thenReturn(SchemaSerDeUtils.toZNRecord(_schema));
+    minionContext.setHelixPropertyStore(helixPropertyStore);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+
+  /**
+   * Three distinct dimension groups across four segments are merged 
independently, each preserving the
+   * total sum and count (and therefore the correct average).
+   */
+  @Test
+  public void testMultipleDimensionGroupsIndependentMerge()
+      throws Exception {
+    List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+    List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+    Assert.assertEquals(results.size(), 1);
+    File mergedSegment = results.get(0).getFile();
+
+    SegmentMetadataImpl metadata = new SegmentMetadataImpl(mergedSegment);
+    Assert.assertEquals(metadata.getTotalDocs(), 3);
+
+    Map<String, AvgPair> avgPairs = readMergedAvgPairs(mergedSegment);
+    Assert.assertEquals(avgPairs.size(), 3);
+
+    // group1: values [1..300] -> sum 45150, count 300, avg 150.5
+    assertAvg(avgPairs.get(GROUP_1), 45150.0, 300L, 150.5);
+    // group2: values [500..799] -> sum 194850, count 300, avg 649.5
+    assertAvg(avgPairs.get(GROUP_2), 194850.0, 300L, 649.5);
+    // group3: values [1..200] -> sum 20100, count 200, avg 100.5
+    assertAvg(avgPairs.get(GROUP_3), 20100.0, 200L, 100.5);
+  }
+
+  /**
+   * The same dimension key appearing in multiple segments has all its 
AvgPairs merged.
+   */
+  @Test
+  public void testCrossSegmentMerging()
+      throws Exception {
+    List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+    Map<String, AvgPair> avgPairs = 
readMergedAvgPairs(runExecutor(segmentDirs, null).get(0).getFile());
+
+    // group1 came from 3 segments (100 + 100 + 100 = 300 values)
+    Assert.assertEquals(avgPairs.get(GROUP_1).getCount(), 300L);
+    // group2 came from 3 segments (100 + 100 + 100 = 300 values)
+    Assert.assertEquals(avgPairs.get(GROUP_2).getCount(), 300L);
+    // group3 came from 3 segments (50 + 100 + 50 = 200 values)
+    Assert.assertEquals(avgPairs.get(GROUP_3).getCount(), 200L);
+  }
+
+  /**
+   * Groups with unequal counts must merge by adding sums and counts, not by 
averaging the per-segment
+   * averages. With counts 10 and 100, an average-of-averages implementation 
would yield 77.5 instead of
+   * the correct 136.409...
+   */
+  @Test
+  public void testUnequalCountsAvoidsAverageOfAverages()
+      throws Exception {
+    List<List<GenericRow>> segments = new ArrayList<>();
+    // 10 values [1..10] -> sum 55, avg 5.5
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 11))));
+    // 100 values [100..199] -> sum 14950, avg 149.5
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(100, 200))));
+
+    Map<String, AvgPair> avgPairs = 
readMergedAvgPairs(runExecutor(buildSegments(segments), null).get(0).getFile());
+    AvgPair merged = avgPairs.get(GROUP_1);
+    Assert.assertEquals(merged.getSum(), 15005.0);
+    Assert.assertEquals(merged.getCount(), 110L);
+    Assert.assertEquals(merged.getSum() / merged.getCount(), 15005.0 / 110.0, 
1e-9);
+  }
+
+  /**
+   * A second rollup pass over already-rolled-up (pre-aggregated) AvgPair 
segments still yields the correct
+   * overall average. Uses unequal counts so average-of-averages would diverge 
from the true value.
+   * <p>
+   * Level 1 merges [1..10] (count 10) and [11..30] (count 20) -> sum 465, 
count 30.
+   * Level 2 merges that with [31..100] (count 70) -> sum 5050, count 100, avg 
50.5 (the true avg of [1..100]).
+   */
+  @Test
+  public void testTwoLevelRollupPreservesAverage()
+      throws Exception {
+    List<List<GenericRow>> level1Input = new ArrayList<>();
+    level1Input.add(List.of(makeRow(GROUP_1, createAvgPair(1, 11))));    // 
[1..10]
+    level1Input.add(List.of(makeRow(GROUP_1, createAvgPair(11, 31))));   // 
[11..30]
+    File level1Merged = runExecutor(buildSegments(level1Input), 
null).get(0).getFile();
+
+    // Verify the intermediate (level-1) AvgPair is preserved as (sum, count), 
not collapsed to an average.
+    AvgPair level1 = readMergedAvgPairs(level1Merged).get(GROUP_1);
+    Assert.assertEquals(level1.getSum(), 465.0);
+    Assert.assertEquals(level1.getCount(), 30L);
+
+    List<File> level2Input = new ArrayList<>();
+    level2Input.add(level1Merged);
+    level2Input.addAll(buildSegments(List.of(List.of(makeRow(GROUP_1, 
createAvgPair(31, 101)))))); // [31..100]
+
+    AvgPair finalPair = readMergedAvgPairs(runExecutor(level2Input, 
null).get(0).getFile()).get(GROUP_1);
+    assertAvg(finalPair, 5050.0, 100L, 50.5);
+  }
+
+  /**
+   * An empty AvgPair byte array (the default null value for BYTES columns) is 
treated as missing; the merge
+   * reflects only the non-empty input.
+   */
+  @Test
+  public void testEmptyAvgPairHandling()
+      throws Exception {
+    List<List<GenericRow>> segments = new ArrayList<>();
+    GenericRow emptyRow = new GenericRow();
+    emptyRow.putValue(DIMENSION_COL, GROUP_1);
+    emptyRow.putValue(AVG_COL, new byte[0]);
+    segments.add(List.of(emptyRow));
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 101)))); // 
[1..100] -> sum 5050, count 100
+
+    List<File> segmentDirs = buildSegments(segments);
+    List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+    SegmentMetadataImpl metadata = new 
SegmentMetadataImpl(results.get(0).getFile());
+    Assert.assertEquals(metadata.getTotalDocs(), 1);
+
+    assertAvg(readMergedAvgPairs(results.get(0).getFile()).get(GROUP_1), 
5050.0, 100L, 50.5);
+  }
+
+  /**
+   * After rollup, the total doc count must equal the number of distinct 
dimension keys.
+   */
+  @Test
+  public void testSegmentDocCountEqualsDistinctKeys()
+      throws Exception {
+    List<File> segmentDirs = buildSegments(buildStandardSegmentData());
+    List<SegmentConversionResult> results = runExecutor(segmentDirs, null);
+
+    SegmentMetadataImpl metadata = new 
SegmentMetadataImpl(results.get(0).getFile());
+    Assert.assertEquals(metadata.getTotalDocs(), 3,
+        "Rollup doc count should equal the number of distinct dimension keys");
+  }
+
+  private static void assertAvg(AvgPair avgPair, double expectedSum, long 
expectedCount, double expectedAvg) {
+    Assert.assertNotNull(avgPair);
+    Assert.assertEquals(avgPair.getSum(), expectedSum);
+    Assert.assertEquals(avgPair.getCount(), expectedCount);
+    Assert.assertEquals(avgPair.getSum() / avgPair.getCount(), expectedAvg, 
1e-9);
+  }
+
+  /**
+   * Creates an AvgPair accumulating the integer values in {@code [start, 
end)}.
+   */
+  private static AvgPair createAvgPair(int start, int end) {
+    AvgPair avgPair = new AvgPair();
+    for (int v = start; v < end; v++) {
+      avgPair.apply(v);
+    }
+    return avgPair;
+  }
+
+  private static GenericRow makeRow(String dimensionValue, AvgPair avgPair) {
+    GenericRow row = new GenericRow();
+    row.putValue(DIMENSION_COL, dimensionValue);
+    row.putValue(AVG_COL, ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(avgPair));
+    return row;
+  }
+
+  private List<File> buildSegments(List<List<GenericRow>> segmentRows)
+      throws Exception {
+    List<File> segmentDirs = new ArrayList<>();
+    for (int i = 0; i < segmentRows.size(); i++) {
+      String segmentName = TABLE_NAME + "_seg" + i + "_" + System.nanoTime();
+      RecordReader recordReader = new 
GenericRowRecordReader(segmentRows.get(i));
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, 
_schema);
+      config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+      config.setTableName(TABLE_NAME);
+      config.setSegmentName(segmentName);
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(config, recordReader);
+      driver.build();
+      segmentDirs.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+    }
+    return segmentDirs;
+  }
+
+  private List<SegmentConversionResult> runExecutor(List<File> segmentDirs, 
Map<String, String> extraConfigs)
+      throws Exception {
+    File workingDir = new File(TEMP_DIR, "workingDir_" + 
(_workingDirCounter++));
+    MergeRollupTaskExecutor executor = new MergeRollupTaskExecutor(new 
MinionConf());
+    
executor.setMinionEventObserver(MinionTaskTestUtils.getMinionProgressObserver());
+
+    Map<String, String> configs = new HashMap<>();
+    configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME + "_OFFLINE");
+    configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
+    configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup");
+    configs.put(AVG_COL + 
MinionConstants.MergeTask.AGGREGATION_TYPE_KEY_SUFFIX, "avg");
+    if (extraConfigs != null) {
+      configs.putAll(extraConfigs);
+    }
+
+    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
+    return executor.convert(pinotTaskConfig, segmentDirs, workingDir);
+  }
+
+  private Map<String, AvgPair> readMergedAvgPairs(File mergedSegmentDir)
+      throws Exception {
+    Map<String, AvgPair> result = new HashMap<>();
+    PinotSegmentRecordReader reader = new PinotSegmentRecordReader();
+    reader.init(mergedSegmentDir, null, null, true);
+    while (reader.hasNext()) {
+      GenericRow row = reader.next();
+      String key = (String) row.getValue(DIMENSION_COL);
+      byte[] bytes = (byte[]) row.getValue(AVG_COL);
+      result.put(key, ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytes));
+    }
+    reader.close();
+    return result;
+  }
+
+  private static List<List<GenericRow>> buildStandardSegmentData() {
+    List<List<GenericRow>> segments = new ArrayList<>();
+
+    // Segment 0: group1=[1..100], group2=[500..599]
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(1, 101)), 
makeRow(GROUP_2, createAvgPair(500, 600))));
+    // Segment 1: group1=[101..200], group2=[600..699], group3=[1..50]
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(101, 201)), 
makeRow(GROUP_2, createAvgPair(600, 700)),
+        makeRow(GROUP_3, createAvgPair(1, 51))));
+    // Segment 2: group1=[201..300], group3=[51..150]
+    segments.add(List.of(makeRow(GROUP_1, createAvgPair(201, 301)), 
makeRow(GROUP_3, createAvgPair(51, 151))));
+    // Segment 3: group2=[700..799], group3=[151..200]
+    segments.add(List.of(makeRow(GROUP_2, createAvgPair(700, 800)), 
makeRow(GROUP_3, createAvgPair(151, 201))));
+
+    return segments;
+  }
+}
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 2be88125942..a07caca15ca 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -182,7 +182,7 @@ public class MergeRollupTaskGeneratorTest {
         () -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn, 
schema, taskConfig));
 
     // Parseable aggregation type without an available value aggregator should 
fail the validation
-    taskConfig.put("c.aggregationType", "avg");
+    taskConfig.put("c.aggregationType", "distinctCount");
     assertThrows(IllegalStateException.class,
         () -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn, 
schema, taskConfig));
     taskConfig.put("c.aggregationType", "lastWithTime");
@@ -216,6 +216,44 @@ public class MergeRollupTaskGeneratorTest {
         () -> taskGenerator.validateTaskConfigs(tableConfigWithTimeColumn, 
schema, missingColumnConfig));
   }
 
+  @Test
+  public void testBytesBackedAggregationColumnTypeValidation() {
+    MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
+    Schema schema = new Schema();
+    schema.addField(new MetricFieldSpec("bytesCol", FieldSpec.DataType.BYTES));
+    schema.addField(new MetricFieldSpec("longCol", FieldSpec.DataType.LONG));
+    schema.addField(new DimensionFieldSpec("d", FieldSpec.DataType.STRING, 
true));
+    schema.addField(new DateTimeFieldSpec(TIME_COLUMN_NAME, 
FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
+        "1:MILLISECONDS"));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName(TIME_COLUMN_NAME).build();
+
+    // Bytes-backed aggregations on a BYTES column are valid
+    for (String aggregationType : new String[]{"avg", "percentileTDigest", 
"distinctCountHLL"}) {
+      Map<String, String> validConfig = new HashMap<>();
+      validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, 
"daily");
+      validConfig.put("bytesCol.aggregationType", aggregationType);
+      taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+    }
+
+    // The same bytes-backed aggregations on a non-BYTES (LONG) column must 
fail at config time
+    for (String aggregationType : new String[]{"avg", "percentileTDigest", 
"distinctCountHLL"}) {
+      Map<String, String> invalidConfig = new HashMap<>();
+      invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, 
"daily");
+      invalidConfig.put("longCol.aggregationType", aggregationType);
+      assertThrows(IllegalStateException.class,
+          () -> taskGenerator.validateTaskConfigs(tableConfig, schema, 
invalidConfig));
+    }
+
+    // Non-bytes-backed aggregations on a numeric column remain valid
+    for (String aggregationType : new String[]{"sum", "max"}) {
+      Map<String, String> validConfig = new HashMap<>();
+      validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, 
"daily");
+      validConfig.put("longCol.aggregationType", aggregationType);
+      taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+    }
+  }
+
   @Test
   public void testInvalidAggregationFunctionFieldName() {
     MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator();
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 2f40da4782a..06357e07345 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -32,12 +32,15 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.utils.config.SchemaSerDeUtils;
 import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.plugin.minion.tasks.MinionTaskTestUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.customobject.AvgPair;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -78,14 +81,17 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
   private static final String TABLE_NAME_WITH_SORTED_COL = 
"testTableWithSortedCol_OFFLINE";
   private static final String TABLE_NAME_EPOCH_HOURS = 
"testTableEpochHours_OFFLINE";
   private static final String TABLE_NAME_SDF = "testTableSDF_OFFLINE";
+  private static final String TABLE_NAME_AVG = "testTableAvg_OFFLINE";
   private static final String D1 = "d1";
   private static final String M1 = "m1";
+  private static final String M_AVG = "mavg";
   private static final String T = "t";
   private static final String T_TRX = "t_trx";
 
   private List<File> _segmentIndexDirList;
   private List<File> _segmentIndexDirListEpochHours;
   private List<File> _segmentIndexDirListSDF;
+  private List<File> _segmentIndexDirListAvg;
 
   @BeforeClass
   public void setUp()
@@ -125,6 +131,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
             .addMetric(M1, FieldSpec.DataType.INT)
             .addDateTime(T_TRX, FieldSpec.DataType.INT, 
"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS").build();
+    TableConfig tableConfigAvg =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_AVG).setTimeColumnName(T).build();
+    Schema schemaAvg =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME_AVG).addSingleValueDimension(D1,
 FieldSpec.DataType.STRING)
+            .addMetric(M_AVG, FieldSpec.DataType.BYTES)
+            .addDateTime(T, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:MILLISECONDS").build();
 
     List<String> d1 = Lists.newArrayList("foo", "bar", "foo", "foo", "bar");
     List<List<GenericRow>> rows = new ArrayList<>(NUM_SEGMENTS);
@@ -191,6 +203,29 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
       _segmentIndexDirListSDF.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
     }
 
+    // create test segments with a BYTES column holding serialized AvgPair 
(sum + count) for AVG rollup.
+    // Two segments share dimension key "a" within the same day bucket, with 
unequal counts (10 and 100) so a
+    // correct merge must add sums and counts rather than average the 
per-segment averages.
+    _segmentIndexDirListAvg = new ArrayList<>();
+    int[][] avgRanges = {{1, 11}, {100, 200}};
+    for (int i = 0; i < avgRanges.length; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(D1, "a");
+      row.putValue(M_AVG, createAvgPairBytes(avgRanges[i][0], 
avgRanges[i][1]));
+      row.putValue(T, 1600473600000L);
+      String segmentName = "segmentAvg_" + i;
+      RecordReader recordReader = new 
GenericRowRecordReader(Collections.singletonList(row));
+      SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(tableConfigAvg, schemaAvg);
+      config.setInstanceType(InstanceType.MINION);
+      config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+      config.setTableName(TABLE_NAME_AVG);
+      config.setSegmentName(segmentName);
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(config, recordReader);
+      driver.build();
+      _segmentIndexDirListAvg.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+    }
+
     MinionContext minionContext = MinionContext.getInstance();
     @SuppressWarnings("unchecked")
     ZkHelixPropertyStore<ZNRecord> helixPropertyStore = 
Mockito.mock(ZkHelixPropertyStore.class);
@@ -215,6 +250,10 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
         .thenReturn(SchemaSerDeUtils.toZNRecord(schemaEpochHours));
     Mockito.when(helixPropertyStore.get("/SCHEMAS/testTableSDF", null, 
AccessOption.PERSISTENT))
         .thenReturn(SchemaSerDeUtils.toZNRecord(schemaSDF));
+    Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/" + TABLE_NAME_AVG, 
null, AccessOption.PERSISTENT))
+        .thenReturn(TableConfigSerDeUtils.toZNRecord(tableConfigAvg));
+    Mockito.when(helixPropertyStore.get("/SCHEMAS/testTableAvg", null, 
AccessOption.PERSISTENT))
+        .thenReturn(SchemaSerDeUtils.toZNRecord(schemaAvg));
     minionContext.setHelixPropertyStore(helixPropertyStore);
   }
 
@@ -339,6 +378,61 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     assertEquals(columnMetadataForM1.getMaxValue(), 3);
   }
 
+  @Test
+  public void testRollupWithAvgAggregation()
+      throws Exception {
+    FileUtils.deleteQuietly(WORKING_DIR);
+
+    RealtimeToOfflineSegmentsTaskExecutor 
realtimeToOfflineSegmentsTaskExecutor =
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
+    
realtimeToOfflineSegmentsTaskExecutor.setMinionEventObserver(MinionTaskTestUtils.getMinionProgressObserver());
+    Map<String, String> configs = new HashMap<>();
+    configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_AVG);
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, 
"1600473600000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, 
"1600560000000");
+    
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
 "1d");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY, 
"rollup");
+    configs.put(M_AVG + 
MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, 
"avg");
+    PinotTaskConfig pinotTaskConfig =
+        new 
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
configs);
+
+    List<SegmentConversionResult> conversionResults =
+        realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, 
_segmentIndexDirListAvg, WORKING_DIR);
+
+    assertEquals(conversionResults.size(), 1);
+    File resultingSegment = conversionResults.get(0).getFile();
+    SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(resultingSegment);
+    // Single (d1=a, day-bucket) group -> the two AvgPairs (counts 10 and 100) 
merge into one row
+    assertEquals(segmentMetadata.getTotalDocs(), 1);
+
+    AvgPair merged = readAvgPair(resultingSegment);
+    // Sum and count are added (not averaged): 55 + 14950 = 15005 over 10 + 
100 = 110.
+    // Average-of-averages would wrongly give (5.5 + 149.5) / 2 = 77.5 instead 
of ~136.41.
+    assertEquals(merged.getSum(), 15005.0);
+    assertEquals(merged.getCount(), 110L);
+  }
+
+  private static byte[] createAvgPairBytes(int start, int end) {
+    AvgPair avgPair = new AvgPair();
+    for (int v = start; v < end; v++) {
+      avgPair.apply(v);
+    }
+    return ObjectSerDeUtils.AVG_PAIR_SER_DE.serialize(avgPair);
+  }
+
+  private static AvgPair readAvgPair(File segmentDir)
+      throws Exception {
+    PinotSegmentRecordReader reader = new PinotSegmentRecordReader();
+    reader.init(segmentDir, null, null, true);
+    try {
+      Assert.assertTrue(reader.hasNext());
+      GenericRow row = reader.next();
+      return ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize((byte[]) 
row.getValue(M_AVG));
+    } finally {
+      reader.close();
+    }
+  }
+
   @Test
   public void testTablePartitioning()
       throws Exception {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 1943fdbb9c9..fee0114cf9d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -524,6 +524,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
 
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addMetric("bytesCol", FieldSpec.DataType.BYTES)
         .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
 
@@ -626,16 +627,17 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
       Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
     }
 
-    // valid agg
+    // valid agg: distinctCountHLL is bytes-backed, so it requires a BYTES 
column
     HashMap<String, String> validAggConfig = new 
HashMap<>(realtimeToOfflineTaskConfig);
-    validAggConfig.put("myCol.aggregationType", "distinctCountHLL");
+    validAggConfig.put("bytesCol.aggregationType", "distinctCountHLL");
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
         new TableTaskConfig(
             Map.of("RealtimeToOfflineSegmentsTask", validAggConfig, 
"SegmentGenerationAndPushTask",
                 segmentGenerationAndPushTaskConfig))).build();
     taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig);
 
-    // valid agg
+    // valid agg: distinctCountHLLPlus is allow-listed but has no merge value 
aggregator, so it is not bytes-backed
+    // and is not subject to the BYTES column requirement
     HashMap<String, String> validAgg2Config = new 
HashMap<>(realtimeToOfflineTaskConfig);
     validAgg2Config.put("myCol.aggregationType", "distinctCountHLLPlus");
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
@@ -694,6 +696,44 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     }
   }
 
+  @Test
+  public void testBytesBackedAggregationColumnTypeValidation() {
+    RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new 
RealtimeToOfflineSegmentsTaskGenerator();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addSingleValueDimension("d", FieldSpec.DataType.STRING)
+        .addMetric("bytesCol", FieldSpec.DataType.BYTES)
+        .addMetric("longCol", FieldSpec.DataType.LONG)
+        .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+            .build();
+
+    // Bytes-backed aggregations on a BYTES column are valid
+    for (String aggregationType : new String[]{"avg", "percentileTDigest", 
"distinctCountHLL"}) {
+      Map<String, String> validConfig = new HashMap<>();
+      validConfig.put("mergeType", "rollup");
+      validConfig.put("bytesCol.aggregationType", aggregationType);
+      taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+    }
+
+    // The same bytes-backed aggregations on a non-BYTES (LONG) column must 
fail at config time
+    for (String aggregationType : new String[]{"avg", "percentileTDigest", 
"distinctCountHLL"}) {
+      Map<String, String> invalidConfig = new HashMap<>();
+      invalidConfig.put("mergeType", "rollup");
+      invalidConfig.put("longCol.aggregationType", aggregationType);
+      Assert.assertThrows(IllegalStateException.class,
+          () -> taskGenerator.validateTaskConfigs(tableConfig, schema, 
invalidConfig));
+    }
+
+    // Non-bytes-backed aggregations on a numeric column remain valid
+    for (String aggregationType : new String[]{"sum", "max"}) {
+      Map<String, String> validConfig = new HashMap<>();
+      validConfig.put("mergeType", "rollup");
+      validConfig.put("longCol.aggregationType", aggregationType);
+      taskGenerator.validateTaskConfigs(tableConfig, schema, validConfig);
+    }
+  }
+
   private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status 
status, long startTime, long endTime,
       TimeUnit timeUnit, String downloadURL) {
     SegmentZKMetadata realtimeSegmentZKMetadata = new 
SegmentZKMetadata(segmentName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to