This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bd50c0fb788 Support MAP type in derived column creation during segment 
reload (#17113)
bd50c0fb788 is described below

commit bd50c0fb7888e37dd1665937f4e3e9be800a3c69
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Nov 5 20:02:35 2025 -0800

    Support MAP type in derived column creation during segment reload (#17113)
---
 .../pinot/common/function/FunctionUtils.java       |   3 +
 .../apache/pinot/common/utils/PinotDataType.java   |  13 ++
 .../stats/MapColumnPreIndexStatsCollector.java     |  98 +++++++++++++--
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  43 +++++--
 .../ExpressionTransformerTest.java                 |  34 ++++-
 .../stats/MapColumnPreIndexStatsCollectorTest.java | 139 +++++++++++++++++++++
 6 files changed, 306 insertions(+), 24 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
index c1445e98bde..e4f6760929d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
@@ -149,6 +149,9 @@ public class FunctionUtils {
     if (Collection.class.isAssignableFrom(clazz)) {
       return PinotDataType.COLLECTION;
     }
+    if (Map.class.isAssignableFrom(clazz)) {
+      return PinotDataType.MAP;
+    }
     return ARGUMENT_TYPE_MAP.get(clazz);
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 030c5a9df9f..c8cc2a242b8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.MapUtils;
 import org.apache.pinot.spi.utils.TimestampUtils;
 
 
@@ -827,6 +828,8 @@ public enum PinotDataType {
           } catch (Exception e) {
             throw new RuntimeException("Unable to convert String to Map. Input 
value: " + value, e);
           }
+        case BYTES:
+          return MapUtils.deserializeMap((byte[]) value);
         case OBJECT:
         case MAP:
           if (value instanceof Map) {
@@ -840,6 +843,16 @@ public enum PinotDataType {
               sourceType, value.getClass()));
       }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public byte[] toBytes(Object value) {
+      if (!(value instanceof Map)) {
+        throw new UnsupportedOperationException("Cannot convert non-Map value 
to BYTES for MAP type: "
+            + (value == null ? "null" : value.getClass()));
+      }
+      return MapUtils.serializeMap((Map<String, Object>) value);
+    }
   },
 
   BYTE_ARRAY {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 99c65e5835b..1d7701ca827 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Map;
 import org.apache.pinot.common.utils.PinotDataType;
@@ -28,20 +30,22 @@ import 
org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.MapUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Extension of {@link AbstractColumnStatisticsCollector} for Map column type.
  *
- * The Map column type is different than other columns in that it is 
essentially recursive.  It contains keys
+ * The Map column type is different from other columns in that it is 
essentially recursive. It contains keys
  * and those keys are analogous to columns and, as such, have Key level 
statistics. So, this class keeps track of
- * Map column level statistics _and_ Key level statistics.  The Key Level 
statistics can then be used during
+ * Map column level statistics _and_ Key level statistics. The Key Level 
statistics can then be used during
  * the creation of the Immutable Segment to make decisions about how keys will 
be stored or what Map data structure
  * to use.
  *
@@ -51,6 +55,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
  * heterogeneous value types for a key are encountered will construct the Map 
statistics it can be raised as a fault.
  */
 public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class);
   private final Object2ObjectOpenHashMap<String, 
AbstractColumnStatisticsCollector> _keyStats =
       new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
   private final Map<String, Integer> _keyFrequencies = new 
Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
@@ -58,13 +63,11 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private boolean _sealed = false;
-  private ComplexFieldSpec _colFieldSpec;
   private boolean _createNoDictCollectorsForKeys = false;
 
   public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
     super(column, statsCollectorConfig);
     _sorted = false;
-    _colFieldSpec = (ComplexFieldSpec) 
statsCollectorConfig.getFieldSpecForColumn(column);
     Map<String, FieldIndexConfigs> indexConfigsByCol = 
FieldIndexConfigsUtil.createIndexConfigsByColName(
         statsCollectorConfig.getTableConfig(), 
statsCollectorConfig.getSchema());
     boolean isDictionaryEnabled = 
indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled();
@@ -96,6 +99,9 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
       for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
         String key = mapValueEntry.getKey();
         Object value = mapValueEntry.getValue();
+        if (value == null) {
+          continue;
+        }
         _keyFrequencies.merge(key, 1, Integer::sum);
         AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
         if (keyStats == null) {
@@ -105,6 +111,67 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
             updatePartition(key);
           }
         }
+        if (keyStats instanceof NoDictColumnStatisticsCollector) {
+          keyStats.collect(value);
+          continue;
+        }
+        if (keyStats instanceof StringColumnPreIndexStatsCollector) {
+          if (value instanceof String || value instanceof Number || value 
instanceof Boolean) {
+            keyStats.collect(String.valueOf(value));
+            continue;
+          }
+          try {
+            keyStats.collect(JsonUtils.objectToString(value));
+            continue;
+          } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to serialize value for key '" + 
key + "': " + value, e);
+          }
+        }
+        if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
+          try {
+            keyStats.collect(new BigDecimal(value.toString()));
+          } catch (NumberFormatException e) {
+            LOGGER.error("Failed to parse BigDecimal for key '{}', value '{}': 
{}", key, value, e.getMessage());
+            // Skip collecting this value for statistics
+          }
+          continue;
+        }
+        if (value instanceof Number) {
+          Number valueNumber = (Number) value;
+          if (keyStats instanceof IntColumnPreIndexStatsCollector) {
+            keyStats.collect(valueNumber.intValue());
+            continue;
+          }
+          if (keyStats instanceof LongColumnPreIndexStatsCollector) {
+            keyStats.collect(valueNumber.longValue());
+            continue;
+          }
+          if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
+            keyStats.collect(valueNumber.floatValue());
+            continue;
+          }
+          if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
+            keyStats.collect(valueNumber.doubleValue());
+            continue;
+          }
+        }
+        if (keyStats instanceof IntColumnPreIndexStatsCollector) {
+          keyStats.collect(Integer.parseInt(value.toString()));
+          continue;
+        }
+        if (keyStats instanceof LongColumnPreIndexStatsCollector) {
+          keyStats.collect(Long.parseLong(value.toString()));
+          continue;
+        }
+        if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
+          keyStats.collect(Float.parseFloat(value.toString()));
+          continue;
+        }
+        if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
+          keyStats.collect(Double.parseDouble(value.toString()));
+          continue;
+        }
+        // Catch all
         keyStats.collect(value);
       }
       _totalNumberOfEntries++;
@@ -161,7 +228,6 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   public void seal() {
     if (!_sealed) {
       //All the keys which have appeared less than total docs insert default 
null Value in unique values
-      FieldSpec valueFieldSpec = _colFieldSpec.getChildFieldSpec("value");
       for (Map.Entry<String, Integer> entry : _keyFrequencies.entrySet()) {
         if (entry.getValue() < _totalNumberOfEntries) {
           
_keyStats.get(entry.getKey()).collect(_keyStats.get(entry.getKey())._fieldSpec.getDefaultNullValue());
@@ -196,7 +262,6 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
     if (_createNoDictCollectorsForKeys) {
       return new NoDictColumnStatisticsCollector(key, config);
     }
-
     switch (type) {
       case INTEGER:
         return new IntColumnPreIndexStatsCollector(key, config);
@@ -208,18 +273,23 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
         return new DoubleColumnPreIndexStatsCollector(key, config);
       case BIG_DECIMAL:
         return new BigDecimalColumnPreIndexStatsCollector(key, config);
+      case BOOLEAN:
       case STRING:
+      case MAP:
+      case OBJECT:
         return new StringColumnPreIndexStatsCollector(key, config);
       default:
-        throw new UnsupportedOperationException(String.format("MAP column does 
not yet support '%s'", type));
+        LOGGER.warn("Unknown data type {} for key {} and value type {}", type, 
key, value.getClass().getName());
+        return new StringColumnPreIndexStatsCollector(key, config);
     }
   }
 
+  /**
+   * Convert Map value data type to stored field type.
+   * Note that all unknown types are automatically converted to String type.
+   */
   private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
-    // TODO: I've been told that we already have a function to do this, so 
find that function and replace this
     switch (ty) {
-      case BOOLEAN:
-        return FieldSpec.DataType.BOOLEAN;
       case SHORT:
       case INTEGER:
         return FieldSpec.DataType.INT;
@@ -233,10 +303,12 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
         return FieldSpec.DataType.BIG_DECIMAL;
       case TIMESTAMP:
         return FieldSpec.DataType.TIMESTAMP;
+      case BOOLEAN:
       case STRING:
-        return FieldSpec.DataType.STRING;
+      case OBJECT:
+      case MAP:
       default:
-        throw new UnsupportedOperationException();
+        return FieldSpec.DataType.STRING;
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index d2acb18f4d8..01c62cb18e0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -47,11 +47,10 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPre
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
-import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
-import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -414,8 +413,7 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             if (!_segmentWriter.hasIndexFor(argument, 
StandardIndexes.forward())) {
               throw new UnsupportedOperationException(String.format("Operation 
not supported! Cannot create a derived "
                       + "column %s because argument: %s does not have a 
forward index. Enable forward index and "
-                      + "refresh/backfill the segments to create a derived 
column from source column %s", column,
-                  argument,
+                      + "refresh/backfill the segments to create a derived 
column from source column", column,
                   argument));
             }
             argumentsMetadata.add(columnMetadata);
@@ -809,6 +807,30 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
                   new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
           break;
         }
+        case MAP: {
+          // Ensure each value is non-null; default for MAP is an empty map
+          for (int i = 0; i < numDocs; i++) {
+            if (outputValues[i] == null) {
+              outputValues[i] = fieldSpec.getDefaultNullValue();
+            }
+          }
+
+          // Use MapColumnPreIndexStatsCollector for collecting MAP stats
+          AbstractColumnStatisticsCollector statsCollector =
+              new MapColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          for (Object value : outputValues) {
+            statsCollector.collect(value);
+          }
+          statsCollector.seal();
+
+          // MAP does not use dictionary encoding
+          createDictionary = false;
+          indexCreationInfo =
+              new ColumnIndexCreationInfo(statsCollector, /* createDictionary 
*/ false, false, true,
+                  fieldSpec.getDefaultNullValue());
+          break;
+        }
+
         default:
           throw new IllegalStateException();
       }
@@ -1166,8 +1188,11 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
             case BYTES:
               forwardIndexCreator.putBytes((byte[]) outputValue);
               break;
+            case MAP:
+              forwardIndexCreator.add(outputValue, -1);
+              break;
             default:
-              throw new IllegalStateException();
+              throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
           }
         }
       } else {
@@ -1193,10 +1218,11 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
               forwardIndexCreator.putBytesMV((byte[][]) outputValue);
               break;
             default:
-              throw new IllegalStateException();
+              throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
           }
         }
       }
+      forwardIndexCreator.seal();
     }
 
     // Add the column metadata
@@ -1222,13 +1248,12 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
     ForwardIndexConfig forwardIndexConfig = null;
     FieldIndexConfigs fieldIndexConfig = 
_indexLoadingConfig.getFieldIndexConfig(column);
     if (fieldIndexConfig != null) {
-      forwardIndexConfig = fieldIndexConfig.getConfig(new 
ForwardIndexPlugin().getIndexType());
+      forwardIndexConfig = 
fieldIndexConfig.getConfig(StandardIndexes.forward());
     }
     if (forwardIndexConfig == null) {
       forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, 
null, null, null);
     }
-
-    return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, 
forwardIndexConfig);
+    return StandardIndexes.forward().createIndexCreator(indexCreationContext, 
forwardIndexConfig);
   }
 
   @SuppressWarnings("rawtypes")
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 8906d6f88eb..70935130b92 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -393,11 +394,40 @@ public class ExpressionTransformerTest {
     genericRow = new GenericRow();
     genericRow.putValue("x", "abcd");
     expressionTransformer.transform(genericRow);
-    Assert.assertEquals(genericRow.getValue("y"), null);
+    Assert.assertNull(genericRow.getValue("y"));
     // Invalid case: x is null, y is int
     genericRow = new GenericRow();
     genericRow.putValue("x", null);
     expressionTransformer.transform(genericRow);
-    Assert.assertEquals(genericRow.getValue("y"), null);
+    Assert.assertNull(genericRow.getValue("y"));
+  }
+
+  @Test
+  public void testJsonToMapIngestionTransform() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("columnJson", FieldSpec.DataType.STRING)
+        .addComplex("columnMap", FieldSpec.DataType.MAP, Map.of(
+            "a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true),
+            "b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true)
+        ))
+        .build();
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(
+        new TransformConfig("columnMap", "jsonStringToMap(columnJson)")));
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testJsonToMapIngestionTransform")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, schema);
+
+    GenericRow row = new GenericRow();
+    row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}");
+
+    expressionTransformer.transform(row);
+    Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap");
+    Assert.assertEquals(map.get("a"), 1);
+    Assert.assertEquals(map.get("b"), "x");
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
index d632111ee04..0e7ce979afb 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -249,4 +250,142 @@ public class MapColumnPreIndexStatsCollectorTest {
       assertEquals(keyNoDict.isSorted(), keyDict.isSorted(), "sorted mismatch 
for key " + key);
     }
   }
+
+  @Test
+  public void testFrequenciesUniqueKeysAndLengths() {
+    Map<String, Object> r1 = new HashMap<>();
+    r1.put("kStr", "alpha");
+    r1.put("kInt", 3);
+    r1.put("kLong", 7L);
+    r1.put("kFloat", 1.5f);
+    r1.put("kDouble", 2.25d);
+    r1.put("kBigDec", new java.math.BigDecimal("10.01"));
+    r1.put("kNull", null); // ignored
+
+    Map<String, Object> r2 = new HashMap<>();
+    r2.put("kStr", "beta");
+    r2.put("kInt", 3);
+    r2.put("kLong", 2L);
+    r2.put("kFloat", 1.5f);
+    r2.put("kDouble", 0.75d);
+    r2.put("kBigDec", new java.math.BigDecimal("10.01"));
+
+    Map<String, Object> r3 = new HashMap<>();
+    r3.put("kStr", "alpha");
+    r3.put("kInt", 3);
+    r3.put("kFloat", 3.5f);
+    r3.put("kBigDec", new java.math.BigDecimal("5.25"));
+
+    StatsCollectorConfig cfg = newConfig(false);
+    MapColumnPreIndexStatsCollector col = new 
MapColumnPreIndexStatsCollector("col", cfg);
+    col.collect(r1);
+    col.collect(r2);
+    col.collect(r3);
+    col.seal();
+
+    // Frequencies per key
+    Map<String, Integer> freqs = col.getAllKeyFrequencies();
+    assertEquals(freqs.get("kStr").intValue(), 3);
+    assertEquals(freqs.get("kInt").intValue(), 3);
+    assertEquals(freqs.get("kLong").intValue(), 2);
+    assertEquals(freqs.get("kFloat").intValue(), 3);
+    assertEquals(freqs.get("kDouble").intValue(), 2);
+    assertEquals(freqs.get("kBigDec").intValue(), 3);
+    assertFalse(freqs.containsKey("kNull"));
+
+    // Unique key set is sorted
+    String[] keys = col.getUniqueValuesSet();
+    assertEquals(keys, new String[]{"kBigDec", "kDouble", "kFloat", "kInt", 
"kLong", "kStr"});
+
+    // Row length metrics
+    int l1 = org.apache.pinot.spi.utils.MapUtils.serializeMap(r1).length;
+    int l2 = org.apache.pinot.spi.utils.MapUtils.serializeMap(r2).length;
+    int l3 = org.apache.pinot.spi.utils.MapUtils.serializeMap(r3).length;
+    int expectedMin = Math.min(l1, Math.min(l2, l3));
+    int expectedMax = Math.max(l1, Math.max(l2, l3));
+    assertEquals(col.getLengthOfShortestElement(), expectedMin);
+    assertEquals(col.getLengthOfLargestElement(), expectedMax);
+    assertEquals(col.getMaxRowLengthInBytes(), expectedMax);
+  }
+
+  @Test
+  public void testTypeConversionAndJsonSerialization() {
+    // Create collectors based on first-seen types, then feed convertible 
values
+    Map<String, Object> r1 = new HashMap<>();
+    r1.put("kInt2", 5); // int collector
+    r1.put("kObj",
+        new TreeMap<>(Map.of("k1", "v1", "k2", "v2"))); // will serialize to a 
JSON object: {"k1":"v1","k2":"v2"}
+    r1.put("kBigDec2", new BigDecimal("1.23")); // big decimal collector
+
+    Map<String, Object> r2 = new HashMap<>();
+    r2.put("kInt2", "6"); // string convertible to int
+    r2.put("kObj", List.of(2, 3)); // will serialize to "[2,3]"
+    r2.put("kBigDec2", "4.56"); // string convertible to big decimal
+
+    StatsCollectorConfig cfg = newConfig(false);
+    MapColumnPreIndexStatsCollector col = new 
MapColumnPreIndexStatsCollector("col", cfg);
+    col.collect(r1);
+    col.collect(r2);
+    col.seal();
+
+    // Int conversion branch
+    AbstractColumnStatisticsCollector sInt = col.getKeyStatistics("kInt2");
+    assertNotNull(sInt);
+    assertTrue(sInt instanceof IntColumnPreIndexStatsCollector);
+    assertEquals(sInt.getMaxValue(), 6);
+    assertEquals(sInt.getMinValue(), 5);
+    assertEquals(sInt.getCardinality(), 2); // {5,6}
+
+    // BigDecimal conversion branch
+    AbstractColumnStatisticsCollector sDec = col.getKeyStatistics("kBigDec2");
+    assertNotNull(sDec);
+    assertTrue(sDec instanceof BigDecimalColumnPreIndexStatsCollector);
+    assertEquals(sDec.getMaxValue(), new java.math.BigDecimal("4.56"));
+    assertEquals(sDec.getMinValue(), new java.math.BigDecimal("1.23"));
+
+    // JSON serialization branch for String collector
+    AbstractColumnStatisticsCollector sObj = col.getKeyStatistics("kObj");
+    assertNotNull(sObj);
+    assertTrue(sObj instanceof StringColumnPreIndexStatsCollector);
+    assertEquals(sObj.getMinValue(), "[2,3]");
+    assertEquals(sObj.getMaxValue(), "{\"k1\":\"v1\",\"k2\":\"v2\"}");
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void testUnsupportedEntryTypeThrows() {
+    StatsCollectorConfig cfg = newConfig(false);
+    MapColumnPreIndexStatsCollector col = new 
MapColumnPreIndexStatsCollector("col", cfg);
+    col.collect("not a map");
+  }
+
+  @Test
+  public void testSealIsIdempotent() {
+    StatsCollectorConfig cfg = newConfig(false);
+    MapColumnPreIndexStatsCollector col = new 
MapColumnPreIndexStatsCollector("col", cfg);
+
+    Map<String, Object> r1 = new HashMap<>();
+    r1.put("a", 1);
+    col.collect(r1);
+
+    col.seal();
+    String[] keys1 = col.getUniqueValuesSet();
+    col.seal(); // no-op
+    String[] keys2 = col.getUniqueValuesSet();
+    assertEquals(keys1, keys2);
+  }
+
+  @Test
+  public void testNoDictCreatesNoDictChildCollectors() {
+    StatsCollectorConfig cfgNoDict = newConfig(true);
+    MapColumnPreIndexStatsCollector col = new 
MapColumnPreIndexStatsCollector("col", cfgNoDict);
+
+    Map<String, Object> r1 = new HashMap<>();
+    r1.put("kInt", 1);
+    r1.put("kStr", "x");
+    col.collect(r1);
+    col.seal();
+
+    assertTrue(col.getKeyStatistics("kInt") instanceof 
NoDictColumnStatisticsCollector);
+    assertTrue(col.getKeyStatistics("kStr") instanceof 
NoDictColumnStatisticsCollector);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to