This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch support-json-to-map-transform-during-ingestion in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9bbd0e3578f2c282e1251df33643e7508c6d5e9c Author: Xiang Fu <[email protected]> AuthorDate: Thu Oct 30 06:10:08 2025 -0700 Support MAP type in derived column creation during segment reload --- .../pinot/common/function/FunctionUtils.java | 3 + .../apache/pinot/common/utils/PinotDataType.java | 13 +++ .../stats/MapColumnPreIndexStatsCollector.java | 107 +++++++++++++++++++-- .../defaultcolumn/BaseDefaultColumnHandler.java | 42 ++++++-- .../ExpressionTransformerTest.java | 34 ++++++- 5 files changed, 182 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java index c1445e98bde..e4f6760929d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java @@ -149,6 +149,9 @@ public class FunctionUtils { if (Collection.class.isAssignableFrom(clazz)) { return PinotDataType.COLLECTION; } + if (Map.class.isAssignableFrom(clazz)) { + return PinotDataType.MAP; + } return ARGUMENT_TYPE_MAP.get(clazz); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java index 030c5a9df9f..c8cc2a242b8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java @@ -32,6 +32,7 @@ import org.apache.pinot.spi.utils.BigDecimalUtils; import org.apache.pinot.spi.utils.BooleanUtils; import org.apache.pinot.spi.utils.BytesUtils; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.MapUtils; import org.apache.pinot.spi.utils.TimestampUtils; @@ -827,6 +828,8 @@ public enum PinotDataType { } catch (Exception e) { throw new RuntimeException("Unable to convert String to Map. Input value: " + value, e); } + case BYTES: + return MapUtils.deserializeMap((byte[]) value); case OBJECT: case MAP: if (value instanceof Map) { @@ -840,6 +843,16 @@ public enum PinotDataType { sourceType, value.getClass())); } } + + @SuppressWarnings("unchecked") + @Override + public byte[] toBytes(Object value) { + if (!(value instanceof Map)) { + throw new UnsupportedOperationException("Cannot convert non-Map value to BYTES for MAP type: " + + (value == null ? "null" : value.getClass())); + } + return MapUtils.serializeMap((Map<String, Object>) value); + } }, BYTE_ARRAY { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java index 99c65e5835b..ca27d0c221c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java @@ -18,9 +18,14 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.stats; +import com.fasterxml.jackson.core.JsonProcessingException; import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; +import java.math.BigDecimal; +import java.text.NumberFormat; import java.util.Arrays; +import java.util.Locale; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; @@ -32,8 +37,11 @@ import org.apache.pinot.spi.data.ComplexFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.MapUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -51,6 +59,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; * heterogeneous value types for a key are encountered will construct the Map statistics it can be raised as a fault. */ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector { + private static final Logger LOGGER = LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class); private final Object2ObjectOpenHashMap<String, AbstractColumnStatisticsCollector> _keyStats = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE); private final Map<String, Integer> _keyFrequencies = new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE); @@ -58,7 +67,7 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol private int _minLength = Integer.MAX_VALUE; private int _maxLength = 0; private boolean _sealed = false; - private ComplexFieldSpec _colFieldSpec; + private final ComplexFieldSpec _colFieldSpec; private boolean _createNoDictCollectorsForKeys = false; public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) { @@ -96,6 +105,9 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) { String key = mapValueEntry.getKey(); Object value = mapValueEntry.getValue(); + if (value == null) { + continue; + } _keyFrequencies.merge(key, 1, Integer::sum); AbstractColumnStatisticsCollector keyStats = _keyStats.get(key); if (keyStats == null) { @@ -105,6 +117,55 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol updatePartition(key); } } + if (keyStats instanceof NoDictColumnStatisticsCollector) { + keyStats.collect(value); + continue; + } + if (keyStats instanceof StringColumnPreIndexStatsCollector) { + if (value instanceof String || value instanceof Number || value instanceof Boolean) { + keyStats.collect(String.valueOf(value)); + continue; + } + try { + keyStats.collect(JsonUtils.objectToString(value)); + continue; + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize value for key '" + key + "': " + value, e); + } + } + // Parse the value once for all numeric collector types + Number valueNumber = null; + if (keyStats instanceof IntColumnPreIndexStatsCollector + || keyStats instanceof LongColumnPreIndexStatsCollector + || keyStats instanceof FloatColumnPreIndexStatsCollector + || keyStats instanceof DoubleColumnPreIndexStatsCollector + || keyStats instanceof BigDecimalColumnPreIndexStatsCollector) { + valueNumber = parseFlexibleNumber(value); + if (valueNumber == null) { + continue; + } + } + if (keyStats instanceof IntColumnPreIndexStatsCollector) { + keyStats.collect(valueNumber.intValue()); + continue; + } + if (keyStats instanceof LongColumnPreIndexStatsCollector) { + keyStats.collect(valueNumber.longValue()); + continue; + } + if (keyStats instanceof FloatColumnPreIndexStatsCollector) { + keyStats.collect(valueNumber.floatValue()); + continue; + } + if (keyStats instanceof DoubleColumnPreIndexStatsCollector) { + keyStats.collect(valueNumber.doubleValue()); + continue; + } + if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) { + keyStats.collect(new BigDecimal(valueNumber.toString())); + continue; + } + // Catch all keyStats.collect(value); } _totalNumberOfEntries++; @@ -113,6 +174,30 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol } } + @Nullable + private Number parseFlexibleNumber(Object input) { + if (input instanceof Number) { + return (Number) input; + } + + String s = input.toString().trim(); + if (s.isEmpty()) { + return null; + } + try { + // Try BigDecimal first — it supports everything cleanly + return new BigDecimal(s); + } catch (NumberFormatException e) { + try { + // Try locale parsing fallback + NumberFormat nf = NumberFormat.getInstance(Locale.US); + return nf.parse(s); + } catch (Exception ignored) { + return null; + } + } + } + @Override public String getMinValue() { if (_sealed) { @@ -196,7 +281,6 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol if (_createNoDictCollectorsForKeys) { return new NoDictColumnStatisticsCollector(key, config); } - switch (type) { case INTEGER: return new IntColumnPreIndexStatsCollector(key, config); @@ -208,18 +292,23 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol return new DoubleColumnPreIndexStatsCollector(key, config); case BIG_DECIMAL: return new BigDecimalColumnPreIndexStatsCollector(key, config); + case BOOLEAN: case STRING: + case MAP: + case OBJECT: return new StringColumnPreIndexStatsCollector(key, config); default: - throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type)); + LOGGER.warn("Unknown data type {} for key {} and value {}", type, key, value); + return new StringColumnPreIndexStatsCollector(key, config); } } + /** + * Convert Map value data type to stored field type. + * Note that all unknown types are automatically converted to String type. + */ private static FieldSpec.DataType convertToDataType(PinotDataType ty) { - // TODO: I've been told that we already have a function to do this, so find that function and replace this switch (ty) { - case BOOLEAN: - return FieldSpec.DataType.BOOLEAN; case SHORT: case INTEGER: return FieldSpec.DataType.INT; @@ -233,10 +322,12 @@ public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol return FieldSpec.DataType.BIG_DECIMAL; case TIMESTAMP: return FieldSpec.DataType.TIMESTAMP; + case BOOLEAN: case STRING: - return FieldSpec.DataType.STRING; + case OBJECT: + case MAP: default: - throw new UnsupportedOperationException(); + return FieldSpec.DataType.STRING; } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index d2acb18f4d8..33f9112ecde 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -47,11 +47,10 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPre import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; -import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory; -import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; @@ -809,6 +808,30 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { new ByteArray((byte[]) fieldSpec.getDefaultNullValue())); break; } + case MAP: { + // Ensure each value is non-null; default for MAP is an empty map + for (int i = 0; i < numDocs; i++) { + if (outputValues[i] == null) { + outputValues[i] = fieldSpec.getDefaultNullValue(); + } + } + + // Use MapColumnPreIndexStatsCollector for collecting MAP stats + AbstractColumnStatisticsCollector statsCollector = + new MapColumnPreIndexStatsCollector(column, statsCollectorConfig); + for (Object value : outputValues) { + statsCollector.collect(value); + } + statsCollector.seal(); + + // MAP does not use dictionary encoding + createDictionary = false; + indexCreationInfo = + new ColumnIndexCreationInfo(statsCollector, /* createDictionary */ false, false, true, + fieldSpec.getDefaultNullValue()); + break; + } + default: throw new IllegalStateException(); } @@ -1166,8 +1189,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { case BYTES: forwardIndexCreator.putBytes((byte[]) outputValue); break; + case MAP: + forwardIndexCreator.add(outputValue, -1); + break; default: - throw new IllegalStateException(); + throw new IllegalStateException( + "Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue); } } } else { @@ -1193,10 +1220,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { forwardIndexCreator.putBytesMV((byte[][]) outputValue); break; default: - throw new IllegalStateException(); + throw new IllegalStateException( + "Unsupported data type: " + fieldSpec.getDataType() + ", for value: " + outputValue); } } } + forwardIndexCreator.seal(); } // Add the column metadata @@ -1222,13 +1251,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { ForwardIndexConfig forwardIndexConfig = null; FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column); if (fieldIndexConfig != null) { - forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType()); + forwardIndexConfig = fieldIndexConfig.getConfig(StandardIndexes.forward()); } if (forwardIndexConfig == null) { forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, null, null, null); } - - return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, forwardIndexConfig); + return StandardIndexes.forward().createIndexCreator(indexCreationContext, forwardIndexConfig); } @SuppressWarnings("rawtypes") diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java index 8906d6f88eb..70935130b92 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -393,11 +394,40 @@ public class ExpressionTransformerTest { genericRow = new GenericRow(); genericRow.putValue("x", "abcd"); expressionTransformer.transform(genericRow); - Assert.assertEquals(genericRow.getValue("y"), null); + Assert.assertNull(genericRow.getValue("y")); // Invalid case: x is null, y is int genericRow = new GenericRow(); genericRow.putValue("x", null); expressionTransformer.transform(genericRow); - Assert.assertEquals(genericRow.getValue("y"), null); + Assert.assertNull(genericRow.getValue("y")); + } + + @Test + public void testJsonToMapIngestionTransform() { + Schema schema = new Schema.SchemaBuilder() + .addSingleValueDimension("columnJson", FieldSpec.DataType.STRING) + .addComplex("columnMap", FieldSpec.DataType.MAP, Map.of( + "a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true), + "b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true) + )) + .build(); + + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setTransformConfigs(Collections.singletonList( + new TransformConfig("columnMap", "jsonStringToMap(columnJson)"))); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName("testJsonToMapIngestionTransform") + .setIngestionConfig(ingestionConfig) + .build(); + + ExpressionTransformer expressionTransformer = new ExpressionTransformer(tableConfig, schema); + + GenericRow row = new GenericRow(); + row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}"); + + expressionTransformer.transform(row); + Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap"); + Assert.assertEquals(map.get("a"), 1); + Assert.assertEquals(map.get("b"), "x"); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
