This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09bc151104e Add single column transformer (#17304)
09bc151104e is described below

commit 09bc151104e034b42d1c30be4185e2f6ef0ebecb
Author: Krishan Goyal <[email protected]>
AuthorDate: Mon Dec 29 14:46:14 2025 +0530

    Add single column transformer (#17304)
    
    * Add column transformer
    
    * Handle nulls and transformers for primitive types
    
    * Handle empty arrays and null transformations more appropriately
    
    * Add tests for column reader transformer
    
    * checkstyle fixes
    
    * Remove column reader transforer
    
    * Add logs in data type column transformer
    
    * Fix checkstyle violations
---
 .../DataTypeColumnTransformer.java                 | 101 ++++
 .../NullValueColumnTransformer.java                |  59 +++
 .../recordtransformer/DataTypeTransformer.java     | 132 +----
 .../recordtransformer/NullValueTransformer.java    |  54 +-
 .../local/utils/DataTypeTransformerUtils.java      | 180 +++++++
 .../local/utils/NullValueTransformerUtils.java     | 123 +++++
 .../DataTypeColumnTransformerTest.java             | 543 +++++++++++++++++++++
 .../NullValueColumnTransformerTest.java            | 444 +++++++++++++++++
 .../recordtransformer/DataTypeTransformerTest.java |  69 +--
 .../spi/columntransformer/ColumnTransformer.java   |  33 ++
 10 files changed, 1527 insertions(+), 211 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
new file mode 100644
index 00000000000..38f8270db9f
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+  private final PinotDataType _destDataType;
+  private final ColumnReader _columnReader;
+  private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
+
+  /**
+   * @param fieldSpec - The field spec for the column being created in Pinot.
+   * @param columnReader - The column reader to read the source data.
+   */
+  public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, ColumnReader columnReader) {
+    _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+    _columnReader = columnReader;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    // If source and destination data types are primitive types and the same, 
no transformation is needed.
+    if (_columnReader.isSingleValue()) {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING);
+      }
+    } else {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG_ARRAY);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING_ARRAY);
+      }
+    }
+    // For other types, because there is no overhead to cast to Object, always 
call transform() which handles all cases
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    String columnName = _columnReader.getColumnName();
+    try {
+      return DataTypeTransformerUtils.transformValue(columnName, value, 
_destDataType);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw new RuntimeException("Caught exception while transforming data 
type for column: " + columnName
+            + " to data type: " + _destDataType, e);
+      }
+      _throttledLogger.warn("Caught exception while transforming data type for 
column: " + columnName
+          + " to data type: " + _destDataType + ". Returning null. Exception: 
{}", e);
+      return null;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
new file mode 100644
index 00000000000..6cde69908b3
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.segment.local.utils.NullValueTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class NullValueColumnTransformer implements ColumnTransformer {
+
+  private final Object _defaultNullValue;
+
+  /**
+   * @param tableConfig The table configuration
+   * @param fieldSpec The field specification for the column
+   * @param schema The schema (required for proper handling of time columns)
+   */
+  public NullValueColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, Schema schema) {
+    _defaultNullValue = 
NullValueTransformerUtils.getDefaultNullValue(fieldSpec, tableConfig, schema);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    if (value instanceof Object[] && ((Object[]) value).length == 0) {
+      // Special case: empty array should be treated as null
+      // TODO - Currently this is done in DataTypeTransformerUtils
+      //  Should this be moved from DataTypeTransformerUtils to 
NullValueTransformerUtils ?
+      //  In Row major build, DataTypeTransformer is called always
+      //  But in Column major build, DataTypeTransformer is not called if 
source and destination data types are same
+      //  If we move this logic to NullValueTransformerUtils, the logic stays 
at one place
+      value = null;
+    }
+    return NullValueTransformerUtils.transformValue(value, _defaultNullValue);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 1c91136602e..9d8b6f7bbcc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -18,18 +18,12 @@
  */
 package org.apache.pinot.segment.local.recordtransformer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -75,50 +69,7 @@ public class DataTypeTransformer implements 
RecordTransformer {
       String column = entry.getKey();
       try {
         Object value = record.getValue(column);
-        if (value == null) {
-          continue;
-        }
-
-        PinotDataType dest = entry.getValue();
-        if (dest != PinotDataType.JSON && dest != PinotDataType.MAP) {
-          value = standardize(column, value, dest.isSingleValue());
-        }
-
-        // NOTE: The standardized value could be null for empty 
Collection/Map/Object[].
-        if (value == null) {
-          record.putValue(column, null);
-          continue;
-        }
-
-        // Convert data type if necessary
-        PinotDataType source;
-        if (value instanceof Object[]) {
-          // Multi-value column
-          Object[] values = (Object[]) value;
-          // JSON is not standardised for empty json array
-          if (dest == PinotDataType.JSON && values.length == 0) {
-            source = PinotDataType.JSON;
-          } else {
-            source = PinotDataType.getMultiValueType(values[0].getClass());
-          }
-        } else {
-          // Single-value column
-          source = PinotDataType.getSingleValueType(value.getClass());
-        }
-
-        // Skipping conversion when srcType!=destType is speculative, and can 
be unsafe when
-        // the array for MV column contains values of mixing types. Mixing 
types can lead
-        // to ClassCastException during conversion, often aborting data 
ingestion jobs.
-        //
-        // So now, calling convert() unconditionally for safety. Perf impact 
is negligible:
-        // 1. for SV column, when srcType=destType, the conversion is simply 
pass through.
-        // 2. for MV column, when srcType=destType, the conversion is simply 
pass through
-        // if the source type is not Object[] (but sth like Integer[], 
Double[]). For Object[],
-        // the conversion loops through values in the array like before, but 
can catch the
-        // ClassCastException if it happens and continue the conversion now.
-        value = dest.convert(value, source);
-        value = dest.toInternal(value);
-
+        value = DataTypeTransformerUtils.transformValue(column, value, 
entry.getValue());
         record.putValue(column, value);
       } catch (Exception e) {
         if (!_continueOnError) {
@@ -130,83 +81,4 @@ public class DataTypeTransformer implements 
RecordTransformer {
       }
     }
   }
-
-  /**
-   * Standardize the value into supported types.
-   * <ul>
-   *   <li>Empty Collection/Map/Object[] will be standardized to null</li>
-   *   <li>Single-entry Collection/Map/Object[] will be standardized to single 
value (map key is ignored)</li>
-   *   <li>Multi-entries Collection/Map/Object[] will be standardized to 
Object[] (map key is ignored)</li>
-   * </ul>
-   */
-  @VisibleForTesting
-  @Nullable
-  static Object standardize(String column, @Nullable Object value, boolean 
isSingleValue) {
-    if (value == null) {
-      return null;
-    }
-    if (value instanceof Collection) {
-      return standardizeCollection(column, (Collection) value, isSingleValue);
-    }
-    if (value instanceof Map) {
-      return standardizeCollection(column, ((Map) value).values(), 
isSingleValue);
-    }
-    if (value instanceof Object[]) {
-      Object[] values = (Object[]) value;
-      int numValues = values.length;
-      if (numValues == 0) {
-        return null;
-      }
-      if (numValues == 1) {
-        return standardize(column, values[0], isSingleValue);
-      }
-      List<Object> standardizedValues = new ArrayList<>(numValues);
-      for (Object singleValue : values) {
-        Object standardizedValue = standardize(column, singleValue, true);
-        if (standardizedValue != null) {
-          standardizedValues.add(standardizedValue);
-        }
-      }
-      int numStandardizedValues = standardizedValues.size();
-      if (numStandardizedValues == 0) {
-        return null;
-      }
-      if (numStandardizedValues == 1) {
-        return standardizedValues.get(0);
-      }
-      if (isSingleValue) {
-        throw new IllegalArgumentException(
-            "Cannot read single-value from Object[]: " + 
Arrays.toString(values) + " for column: " + column);
-      }
-      return standardizedValues.toArray();
-    }
-    return value;
-  }
-
-  private static Object standardizeCollection(String column, Collection 
collection, boolean isSingleValue) {
-    int numValues = collection.size();
-    if (numValues == 0) {
-      return null;
-    }
-    if (numValues == 1) {
-      return standardize(column, collection.iterator().next(), isSingleValue);
-    }
-    List<Object> standardizedValues = new ArrayList<>(numValues);
-    for (Object singleValue : collection) {
-      Object standardizedValue = standardize(column, singleValue, true);
-      if (standardizedValue != null) {
-        standardizedValues.add(standardizedValue);
-      }
-    }
-    int numStandardizedValues = standardizedValues.size();
-    if (numStandardizedValues == 0) {
-      return null;
-    }
-    if (numStandardizedValues == 1) {
-      return standardizedValues.get(0);
-    }
-    Preconditions.checkState(!isSingleValue, "Cannot read single-value from 
Collection: %s for column: %s", collection,
-        column);
-    return standardizedValues.toArray();
-  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
index 43b78ca6525..6427fb25b26 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
@@ -18,67 +18,26 @@
  */
 package org.apache.pinot.segment.local.recordtransformer;
 
-import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.utils.NullValueTransformerUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 public class NullValueTransformer implements RecordTransformer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(NullValueTransformer.class);
 
   private final Map<String, Object> _defaultNullValues = new HashMap<>();
 
   public NullValueTransformer(TableConfig tableConfig, Schema schema) {
-    String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
-
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-      if (!fieldSpec.isVirtualColumn() && 
!fieldSpec.getName().equals(timeColumnName)) {
+      if (!fieldSpec.isVirtualColumn()) {
         String fieldName = fieldSpec.getName();
-        Object defaultNullValue = fieldSpec.getDefaultNullValue();
-        if (fieldSpec.isSingleValueField()) {
-          _defaultNullValues.put(fieldName, defaultNullValue);
-        } else {
-          _defaultNullValues.put(fieldName, new Object[]{defaultNullValue});
-        }
-      }
-    }
-
-    // NOTE: Time column is used to manage the segments, so its values have to 
be within the valid range. If the default
-    //       time value in the field spec is within the valid range, we use it 
as the default time value; if not, we use
-    //       current time as the default time value.
-    if (StringUtils.isNotEmpty(timeColumnName)) {
-      DateTimeFieldSpec timeColumnSpec = 
schema.getSpecForTimeColumn(timeColumnName);
-      Preconditions.checkState(timeColumnSpec != null, "Failed to find time 
field: %s from schema: %s", timeColumnName,
-          schema.getSchemaName());
-
-      String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
-      DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
-      try {
-        long defaultTimeMs = 
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
-        if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
-          _defaultNullValues.put(timeColumnName, 
timeColumnSpec.getDefaultNullValue());
-          return;
-        }
-      } catch (Exception e) {
-        // Ignore
+        Object defaultNullValue = 
NullValueTransformerUtils.getDefaultNullValue(fieldSpec, tableConfig, schema);
+        _defaultNullValues.put(fieldName, defaultNullValue);
       }
-      String currentTimeString = 
dateTimeFormatSpec.fromMillisToFormat(System.currentTimeMillis());
-      Object currentTime = 
timeColumnSpec.getDataType().convert(currentTimeString);
-      _defaultNullValues.put(timeColumnName, currentTime);
-      LOGGER.info(
-          "Default time: {} does not comply with format: {}, using current 
time: {} as the default time for table: {}",
-          defaultTimeString, timeColumnSpec.getFormat(), currentTime, 
tableConfig.getTableName());
     }
   }
 
@@ -87,8 +46,9 @@ public class NullValueTransformer implements 
RecordTransformer {
     for (Map.Entry<String, Object> entry : _defaultNullValues.entrySet()) {
       String fieldName = entry.getKey();
       Object value = record.getValue(fieldName);
-      if (value == null) {
-        record.putDefaultNullValue(fieldName, entry.getValue());
+      Object transformedValue = 
NullValueTransformerUtils.transformValue(value, entry.getValue());
+      if (transformedValue != value) {
+        record.putDefaultNullValue(fieldName, transformedValue);
       }
     }
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
new file mode 100644
index 00000000000..017f2d274e7
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.PinotDataType;
+
+
+/**
+ * Utility class for data type transformation operations shared between
+ * DataTypeTransformer (record-level) and DataTypeColumnTransformer 
(column-level).
+ */
+@SuppressWarnings("rawtypes")
+public class DataTypeTransformerUtils {
+
+  private DataTypeTransformerUtils() {
+    // Utility class - prevent instantiation
+  }
+
+  /**
+   * Transforms a value to the destination data type.
+   * This method standardizes the value, determines the source type, converts 
to the destination type,
+   * and converts to internal representation.
+   *
+   * @param column The column name (used for error messages)
+   * @param value The value to transform
+   * @param destDataType The destination PinotDataType
+   * @return The transformed value, or null if the standardized value is null
+   */
+  @Nullable
+  public static Object transformValue(String column, @Nullable Object value, 
PinotDataType destDataType) {
+    if (value == null) {
+      return null;
+    }
+
+    // Standardize the value (except for JSON and MAP types)
+    if (destDataType != PinotDataType.JSON && destDataType != 
PinotDataType.MAP) {
+      value = standardize(column, value, destDataType.isSingleValue());
+    }
+
+    // NOTE: The standardized value could be null for empty 
Collection/Map/Object[].
+    if (value == null) {
+      return null;
+    }
+
+    // Determine the source data type
+    PinotDataType sourceDataType;
+    if (value instanceof Object[]) {
+      // Multi-value column
+      Object[] values = (Object[]) value;
+      // JSON is not standardised for empty json array
+      if (destDataType == PinotDataType.JSON && values.length == 0) {
+        sourceDataType = PinotDataType.JSON;
+      } else {
+        sourceDataType = PinotDataType.getMultiValueType(values[0].getClass());
+      }
+    } else {
+      // Single-value column
+      sourceDataType = PinotDataType.getSingleValueType(value.getClass());
+    }
+
+    // Convert from source to destination type
+    // Skipping conversion when srcType!=destType is speculative, and can be 
unsafe when
+    // the array for MV column contains values of mixing types. Mixing types 
can lead
+    // to ClassCastException during conversion, often aborting data ingestion 
jobs.
+    //
+    // So now, calling convert() unconditionally for safety. Perf impact is 
negligible:
+    // 1. for SV column, when srcType=destType, the conversion is simply pass 
through.
+    // 2. for MV column, when srcType=destType, the conversion is simply pass 
through
+    // if the source type is not Object[] (but sth like Integer[], Double[]). 
For Object[],
+    // the conversion loops through values in the array like before, but can 
catch the
+    // ClassCastException if it happens and continue the conversion now.
+    value = destDataType.convert(value, sourceDataType);
+    value = destDataType.toInternal(value);
+
+    return value;
+  }
+
+  /**
+   * Standardize the value into supported types.
+   * <ul>
+   *   <li>Empty Collection/Map/Object[] will be standardized to null</li>
+   *   <li>Single-entry Collection/Map/Object[] will be standardized to single 
value (map key is ignored)</li>
+   *   <li>Multi-entries Collection/Map/Object[] will be standardized to 
Object[] (map key is ignored)</li>
+   * </ul>
+   */
+  @VisibleForTesting
+  @Nullable
+  public static Object standardize(String column, @Nullable Object value, 
boolean isSingleValue) {
+    if (value == null) {
+      return null;
+    }
+    if (value instanceof Collection) {
+      return standardizeCollection(column, (Collection) value, isSingleValue);
+    }
+    if (value instanceof Map) {
+      return standardizeCollection(column, ((Map) value).values(), 
isSingleValue);
+    }
+    if (value instanceof Object[]) {
+      Object[] values = (Object[]) value;
+      int numValues = values.length;
+      if (numValues == 0) {
+        return null;
+      }
+      if (numValues == 1) {
+        return standardize(column, values[0], isSingleValue);
+      }
+      List<Object> standardizedValues = new ArrayList<>(numValues);
+      for (Object singleValue : values) {
+        Object standardizedValue = standardize(column, singleValue, true);
+        if (standardizedValue != null) {
+          standardizedValues.add(standardizedValue);
+        }
+      }
+      int numStandardizedValues = standardizedValues.size();
+      if (numStandardizedValues == 0) {
+        return null;
+      }
+      if (numStandardizedValues == 1) {
+        return standardizedValues.get(0);
+      }
+      if (isSingleValue) {
+        throw new IllegalArgumentException(
+            "Cannot read single-value from Object[]: " + 
Arrays.toString(values) + " for column: " + column);
+      }
+      return standardizedValues.toArray();
+    }
+    return value;
+  }
+
+  private static Object standardizeCollection(String column, Collection 
collection, boolean isSingleValue) {
+    int numValues = collection.size();
+    if (numValues == 0) {
+      return null;
+    }
+    if (numValues == 1) {
+      return standardize(column, collection.iterator().next(), isSingleValue);
+    }
+    List<Object> standardizedValues = new ArrayList<>(numValues);
+    for (Object singleValue : collection) {
+      Object standardizedValue = standardize(column, singleValue, true);
+      if (standardizedValue != null) {
+        standardizedValues.add(standardizedValue);
+      }
+    }
+    int numStandardizedValues = standardizedValues.size();
+    if (numStandardizedValues == 0) {
+      return null;
+    }
+    if (numStandardizedValues == 1) {
+      return standardizedValues.get(0);
+    }
+    Preconditions.checkState(!isSingleValue, "Cannot read single-value from 
Collection: %s for column: %s", collection,
+        column);
+    return standardizedValues.toArray();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
new file mode 100644
index 00000000000..85d68f45a38
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for null value transformation operations shared between
+ * NullValueTransformer (record-level) and NullValueColumnTransformer 
(column-level).
+ */
+public class NullValueTransformerUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NullValueTransformerUtils.class);
+
+  private NullValueTransformerUtils() {
+    // Utility class - prevent instantiation
+  }
+
+  /**
+   * Computes the default null value for a given field spec.
+   * For time columns, validates that the default time is within valid range,
+   * otherwise uses current time.
+   *
+   * @param fieldSpec The field specification
+   * @param tableConfig The table configuration
+   * @param schema The schema
+   * @return The default null value to use for this field
+   */
+  @Nullable
+  public static Object getDefaultNullValue(FieldSpec fieldSpec, TableConfig 
tableConfig, Schema schema) {
+    String fieldName = fieldSpec.getName();
+    String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
+
+    // Handle time column specially
+    if (StringUtils.isNotEmpty(timeColumnName) && 
fieldName.equals(timeColumnName)) {
+      return getDefaultTimeValue(timeColumnName, tableConfig, schema);
+    }
+
+    // For non-time columns, use the field spec's default null value
+    Object defaultNullValue = fieldSpec.getDefaultNullValue();
+    if (fieldSpec.isSingleValueField()) {
+      return defaultNullValue;
+    } else {
+      return new Object[]{defaultNullValue};
+    }
+  }
+
+  /**
+   * Computes the default time value for the time column.
+   * Time column is used to manage the segments, so its values have to be 
within the valid range. If the default
+   * time value in the field spec is within the valid range, we use it as the 
default time value; if not, we use
+   * current time as the default time value.
+   *
+   * @param timeColumnName The name of the time column
+   * @param tableConfig The table configuration
+   * @param schema The schema
+   * @return The default time value to use
+   */
+  private static Object getDefaultTimeValue(String timeColumnName, TableConfig 
tableConfig, Schema schema) {
+    DateTimeFieldSpec timeColumnSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+    Preconditions.checkState(timeColumnSpec != null, "Failed to find time 
field: %s from schema: %s", timeColumnName,
+        schema.getSchemaName());
+
+    String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
+    DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
+
+    // Try to use the default time from the field spec if it's valid
+    try {
+      long defaultTimeMs = 
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
+      if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
+        return timeColumnSpec.getDefaultNullValue();
+      }
+    } catch (Exception e) {
+      // Ignore and fall through to use current time
+    }
+
+    // Use current time if default time is not valid
+    String currentTimeString = 
dateTimeFormatSpec.fromMillisToFormat(System.currentTimeMillis());
+    Object currentTime = 
timeColumnSpec.getDataType().convert(currentTimeString);
+    LOGGER.info(
+        "Default time: {} does not comply with format: {}, using current time: 
{} as the default time for table: {}",
+        defaultTimeString, timeColumnSpec.getFormat(), currentTime, 
tableConfig.getTableName());
+    return currentTime;
+  }
+
+  /**
+   * Transforms a value by replacing null with the default null value.
+   *
+   * @param value The value to transform
+   * @param defaultNullValue The default null value to use if value is null
+   * @return The original value if not null, otherwise the default null value
+   */
+  @Nullable
+  public static Object transformValue(@Nullable Object value, Object 
defaultNullValue) {
+    return value != null ? value : defaultNullValue;
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
new file mode 100644
index 00000000000..c52bd254b5a
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Comprehensive tests for DataTypeColumnTransformer.
+ * Tests data type conversions, isNoOp optimization, and error handling.
+ */
+public class DataTypeColumnTransformerTest {
+
+  private static final String COLUMN_NAME = "testColumn";
+
+  private ColumnReader createMockColumnReader(String columnName, boolean 
isSingleValue,
+      boolean isInt, boolean isLong, boolean isFloat, boolean isDouble, 
boolean isString, boolean isBytes) {
+    ColumnReader reader = Mockito.mock(ColumnReader.class);
+    when(reader.getColumnName()).thenReturn(columnName);
+    when(reader.isSingleValue()).thenReturn(isSingleValue);
+    when(reader.isInt()).thenReturn(isInt);
+    when(reader.isLong()).thenReturn(isLong);
+    when(reader.isFloat()).thenReturn(isFloat);
+    when(reader.isDouble()).thenReturn(isDouble);
+    when(reader.isString()).thenReturn(isString);
+    when(reader.isBytes()).thenReturn(isBytes);
+    return reader;
+  }
+
+  @Test
+  public void testIsNoOpForMatchingIntTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both INT");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingLongTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
true, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both LONG");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingFloatTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, true, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both FLOAT");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingDoubleTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, true, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both DOUBLE");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingStringTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both STRING");
+  }
+
+  @Test
+  public void testIsNotNoOpForDifferentTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertFalse(transformer.isNoOp(), "Should not be no-op when converting INT 
to LONG");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingIntMVTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both INT[]");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingLongMVTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false, 
true, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both LONG[]");
+  }
+
+  @Test
+  public void testIsNoOpForMatchingStringMVTypes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are 
both STRING[]");
+  }
+
+  @Test
+  public void testTransformNullValue() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(null);
+    assertNull(result, "Null values should pass through as null");
+  }
+
+  @Test
+  public void testTransformStringToInt() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform("42");
+    assertEquals(result, 42);
+  }
+
+  @Test
+  public void testTransformStringToLong() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform("1234567890");
+    assertEquals(result, 1234567890L);
+  }
+
+  @Test
+  public void testTransformStringToFloat() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform("3.14");
+    assertEquals(result, 3.14f);
+  }
+
+  @Test
+  public void testTransformStringToDouble() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform("3.14159");
+    assertEquals(result, 3.14159);
+  }
+
+  @Test
+  public void testTransformIntToLong() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(42);
+    assertEquals(result, 42L);
+  }
+
+  @Test
+  public void testTransformIntToString() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(42);
+    assertEquals(result, "42");
+  }
+
+  @Test
+  public void testTransformLongToTimestamp() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.TIMESTAMP)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
true, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    long timestampValue = 1609459200000L; // 2021-01-01 00:00:00 UTC
+    Object result = transformer.transform(timestampValue);
+    assertEquals(result, timestampValue);
+  }
+
+  @Test
+  public void testInvalidConversionWithContinueOnErrorFalse() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+    // Default table config has continueOnError = false
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Try to convert invalid string to int
+    expectThrows(RuntimeException.class, () -> 
transformer.transform("not_a_number"));
+  }
+
+  @Test
+  public void testInvalidConversionWithContinueOnErrorTrue() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+    // Set continueOnError = true
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setContinueOnError(true);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testTable")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Try to convert invalid string to int - should return null
+    Object result = transformer.transform("not_a_number");
+    assertNull(result, "Invalid conversion should return null when 
continueOnError=true");
+  }
+
+  @Test
+  public void testTransformStringArrayToIntArray() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(new Object[]{"1", "2", "3"});
+    Object[] intArray = (Object[]) result;
+    assertEquals(intArray.length, 3);
+    assertEquals(intArray[0], 1);
+    assertEquals(intArray[1], 2);
+    assertEquals(intArray[2], 3);
+  }
+
+  @Test
+  public void testTransformEmptyArray() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Empty arrays should be standardized to null
+    Object result = transformer.transform(new Object[0]);
+    assertNull(result);
+  }
+
+  @Test
+  public void testTransformSingleElementArray() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Single element arrays should be unwrapped
+    Object result = transformer.transform(new Object[]{"42"});
+    assertEquals(result, 42);
+  }
+
+  @Test
+  public void testMVToSVConversionError() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Try to convert multi-value array to single-value
+    expectThrows(RuntimeException.class, () -> transformer.transform(new 
Object[]{"1", "2", "3"}));
+  }
+
+  @Test
+  public void testTransformBooleanToString() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(true);
+    assertEquals(result, "true");
+  }
+
+  @Test
+  public void testTransformStringToBoolean() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.BOOLEAN)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, true, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform("true");
+    assertEquals(result, 1); // Boolean true is stored as 1
+
+    result = transformer.transform("false");
+    assertEquals(result, 0); // Boolean false is stored as 0
+  }
+
+  @Test
+  public void testTransformFloatToDouble() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, true, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(3.14f);
+    assertTrue(result instanceof Double);
+    assertEquals((Double) result, 3.14, 0.001);
+  }
+
+  @Test
+  public void testTransformDoubleToFloat() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, true, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(3.14159);
+    assertTrue(result instanceof Float);
+    assertEquals((Float) result, 3.14159f, 0.001f);
+  }
+
+  @Test
+  public void testTransformBytesToString() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, false, true);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    byte[] bytes = "test".getBytes();
+    Object result = transformer.transform(bytes);
+    assertTrue(result instanceof String);
+  }
+
+  @Test
+  public void testTransformPreservesValue() {
+    // When source and dest types match (no-op case), value should still be 
transformed
+    // to ensure any internal representation conversion happens
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    Object result = transformer.transform(42);
+    assertEquals(result, 42);
+  }
+
+  @Test
+  public void testIsNotNoOpForBytes() {
+    // Bytes type should not be no-op since there's overhead and special 
handling
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.BYTES)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, false, true);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    // Bytes type uses generic transformation path
+    assertFalse(transformer.isNoOp());
+  }
+
+  @Test
+  public void testIsNotNoOpForJson() {
+    // JSON type should not be no-op
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.JSON)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+    ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false, 
false, false, false, false, false);
+    DataTypeColumnTransformer transformer = new 
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+    assertFalse(transformer.isNoOp());
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
new file mode 100644
index 00000000000..eb0b0bed223
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Comprehensive tests for NullValueColumnTransformer.
+ * Tests null value handling and default value substitution for all data types.
+ */
+public class NullValueColumnTransformerTest {
+
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+  @Test
+  public void testIsNoOp() {
+    // NullValueColumnTransformer should never be a no-op since it always 
needs to handle null values
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    assertFalse(transformer.isNoOp(), "NullValueColumnTransformer should never 
be a no-op");
+  }
+
+  @Test
+  public void testTransformNullToDefaultInt() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+    assertEquals(result, Integer.MIN_VALUE);
+  }
+
+  @Test
+  public void testTransformNullToDefaultLong() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("longCol", FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("longCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+    assertEquals(result, Long.MIN_VALUE);
+  }
+
+  @Test
+  public void testTransformNullToDefaultFloat() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("floatCol", FieldSpec.DataType.FLOAT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("floatCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+    assertEquals(result, Float.NEGATIVE_INFINITY);
+  }
+
+  @Test
+  public void testTransformNullToDefaultDouble() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("doubleCol", FieldSpec.DataType.DOUBLE)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("doubleCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+    assertEquals(result, Double.NEGATIVE_INFINITY);
+  }
+
+  @Test
+  public void testTransformNullToDefaultString() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+    assertEquals(result, "null");
+  }
+
+  @Test
+  public void testTransformNullToDefaultBytes() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("bytesCol", FieldSpec.DataType.BYTES)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("bytesCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof byte[]);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+  }
+
+  @Test
+  public void testTransformNullToDefaultBoolean() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("boolCol", FieldSpec.DataType.BOOLEAN)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("boolCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+  }
+
+  @Test
+  public void testTransformNullToDefaultTimestamp() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("timestampCol", FieldSpec.DataType.TIMESTAMP)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("timestampCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, fieldSpec.getDefaultNullValue());
+  }
+
+  @Test
+  public void testTransformNullToDefaultIntMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("intMVCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], Integer.MIN_VALUE);
+  }
+
+  @Test
+  public void testTransformNullToDefaultLongMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("longMVCol", FieldSpec.DataType.LONG)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("longMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], Long.MIN_VALUE);
+  }
+
+  @Test
+  public void testTransformNullToDefaultFloatMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("floatMVCol", FieldSpec.DataType.FLOAT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("floatMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], Float.NEGATIVE_INFINITY);
+  }
+
+  @Test
+  public void testTransformNullToDefaultDoubleMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("doubleMVCol", FieldSpec.DataType.DOUBLE)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("doubleMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], Double.NEGATIVE_INFINITY);
+  }
+
+  @Test
+  public void testTransformNullToDefaultStringMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("stringMVCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], "null");
+  }
+
+  @Test
+  public void testTransformEmptyObjectArrayToNull() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+    // Empty Object[] should be treated as null
+    Object result = transformer.transform(new Object[0]);
+
+    assertNotNull(result);
+    assertEquals(result, "null");
+  }
+
+  @Test
+  public void testTransformEmptyObjectArrayToNullForMV() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("stringMVCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+    // Empty Object[] should be treated as null
+    Object result = transformer.transform(new Object[0]);
+
+    assertNotNull(result);
+    assertTrue(result instanceof Object[]);
+    Object[] resultArray = (Object[]) result;
+    assertEquals(resultArray.length, 1);
+    assertEquals(resultArray[0], "null");
+  }
+
+  @Test
+  public void testNonNullValuePassThrough() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+    Integer testValue = 42;
+    Object result = transformer.transform(testValue);
+
+    assertEquals(result, testValue);
+  }
+
+  @Test
+  public void testNonNullStringPassThrough() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+    String testValue = "test string";
+    Object result = transformer.transform(testValue);
+
+    assertEquals(result, testValue);
+  }
+
+  @Test
+  public void testNonNullMVPassThrough() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addMultiValueDimension("intMVCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intMVCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+    int[] testValue = new int[]{1, 2, 3};
+    Object result = transformer.transform(testValue);
+
+    assertEquals(result, testValue);
+  }
+
+  @Test
+  public void testTimeColumnWithValidDefault() {
+    // Create a time column with a valid default value
+    Schema schema = new Schema.SchemaBuilder()
+        .addDateTime("timeCol", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testTable")
+        .setTimeColumnName("timeCol")
+        .build();
+
+    FieldSpec fieldSpec = schema.getFieldSpecFor("timeCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(tableConfig, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    // Should use the field spec's default value since it's valid
+    assertTrue(result instanceof Long);
+  }
+
+  @Test
+  public void testTimeColumnWithInvalidDefault() {
+    // Create a time column with an invalid default value (far future)
+    Schema schema = new Schema.SchemaBuilder()
+        .addDateTimeField("timeCol", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testTable")
+        .setTimeColumnName("timeCol")
+        .build();
+
+    FieldSpec fieldSpec = schema.getFieldSpecFor("timeCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(tableConfig, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    // Should use current time instead of the invalid default
+    assertTrue(result instanceof Long);
+    long resultTime = (Long) result;
+    long currentTime = System.currentTimeMillis();
+    // Result should be close to current time (within 1 second)
+    assertTrue(Math.abs(resultTime - currentTime) < 1000,
+        "Time should be close to current time, got: " + resultTime + ", 
current: " + currentTime);
+  }
+
+  @Test
+  public void testCustomDefaultNullValue() {
+    // Test with a field that has a custom default null value
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("customIntCol", FieldSpec.DataType.INT, 999) 
// Custom default
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("customIntCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, 999);
+  }
+
+  @Test
+  public void testCustomDefaultNullValueString() {
+    // Test with a string field that has a custom default null value
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("customStringCol", FieldSpec.DataType.STRING, 
"MISSING") // Custom default
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("customStringCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(null);
+
+    assertNotNull(result);
+    assertEquals(result, "MISSING");
+  }
+
+  @Test
+  public void testZeroValue() {
+    // Zero should not be treated as null
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform(0);
+
+    assertEquals(result, 0);
+  }
+
+  @Test
+  public void testEmptyString() {
+    // Empty string should not be treated as null
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+        .build();
+    FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+    NullValueColumnTransformer transformer = new 
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+    Object result = transformer.transform("");
+
+    assertEquals(result, "");
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
index af41e22b3bb..ed9fae8a1fa 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -42,14 +43,14 @@ public class DataTypeTransformerTest {
 
     // Empty Map
     Map<String, Object> map = Collections.emptyMap();
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, true));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, false));
 
     // Map with single entry
     String expectedValue = "testValue";
     map = Collections.singletonMap("testKey", expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), 
expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false), 
expectedValue);
 
     // Map with multiple entries
     Object[] expectedValues = new Object[]{"testValue1", "testValue2"};
@@ -58,12 +59,12 @@ public class DataTypeTransformerTest {
     map.put("testKey2", "testValue2");
     try {
       // Should fail because Map with multiple entries cannot be standardized 
as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformerUtils.standardize(COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, 
map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) 
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
 
     /**
      * Tests for List
@@ -71,24 +72,24 @@ public class DataTypeTransformerTest {
 
     // Empty List
     List<Object> list = Collections.emptyList();
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, false));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, list, true));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, list, false));
 
     // List with single entry
     list = Collections.singletonList(expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), 
expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, list, true), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, list, false), 
expectedValue);
 
     // List with multiple entries
     list = Arrays.asList(expectedValues);
     try {
       // Should fail because List with multiple entries cannot be standardized 
as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformerUtils.standardize(COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, 
false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN, list, 
false), expectedValues);
 
     /**
      * Tests for Object[]
@@ -96,24 +97,24 @@ public class DataTypeTransformerTest {
 
     // Empty Object[]
     Object[] values = new Object[0];
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, false));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, values, true));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, values, false));
 
     // Object[] with single entry
     values = new Object[]{expectedValue};
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), 
expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, values, true), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, values, false), 
expectedValue);
 
     // Object[] with multiple entries
     values = new Object[]{"testValue1", "testValue2"};
     try {
       // Should fail because Object[] with multiple entries cannot be 
standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformerUtils.standardize(COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, 
false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN, 
values, false), expectedValues);
 
     /**
      * Tests for nested Map/List/Object[]
@@ -121,32 +122,32 @@ public class DataTypeTransformerTest {
 
     // Map with empty List
     map = Collections.singletonMap("testKey", Collections.emptyList());
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, true));
+    assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, false));
 
     // Map with single-entry List
     map = Collections.singletonMap("testKey", 
Collections.singletonList(expectedValue));
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), 
expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false), 
expectedValue);
 
     // Map with one empty Map and one single-entry Map
     map = new HashMap<>();
     map.put("testKey1", Collections.emptyMap());
     map.put("testKey2", Collections.singletonMap("testKey", expectedValue));
     // Can be standardized into single value because empty Map should be 
ignored
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), 
expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true), 
expectedValue);
+    assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false), 
expectedValue);
 
     // Map with multi-entries List
     map = Collections.singletonMap("testKey", Arrays.asList(expectedValues));
     try {
       // Should fail because Map with multiple entries cannot be standardized 
as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformerUtils.standardize(COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, 
map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) 
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
 
     // Map with one empty Map, one single-entry List and one single-entry 
Object[]
     map = new HashMap<>();
@@ -155,12 +156,12 @@ public class DataTypeTransformerTest {
     map.put("testKey3", new Object[]{"testValue2"});
     try {
       // Should fail because Map with multiple entries cannot be standardized 
as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformerUtils.standardize(COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, 
map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) 
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
 
     // List with two single-entry Maps and one empty Map
     list = Arrays
@@ -168,12 +169,12 @@ public class DataTypeTransformerTest {
             Collections.emptyMap());
     try {
       // Should fail because List with multiple entries cannot be standardized 
as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformerUtils.standardize(COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, 
false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN, list, 
false), expectedValues);
 
     // Object[] with two single-entry Maps
     values = new Object[]{
@@ -181,12 +182,12 @@ public class DataTypeTransformerTest {
     };
     try {
       // Should fail because Object[] with multiple entries cannot be 
standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformerUtils.standardize(COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, 
values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) 
DataTypeTransformerUtils.standardize(COLUMN, values, false), expectedValues);
 
     // Object[] with one empty Object[], one multi-entries List of nested 
Map/List/Object[]
     values = new Object[]{
@@ -194,11 +195,11 @@ public class DataTypeTransformerTest {
         Collections.singletonMap("testKey", Arrays.asList(new 
Object[]{"testValue2"}, Collections.emptyMap()))
     };
     try {
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformerUtils.standardize(COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, 
values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) 
DataTypeTransformerUtils.standardize(COLUMN, values, false), expectedValues);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
new file mode 100644
index 00000000000..6ebc9864788
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.columntransformer;
+
+import javax.annotation.Nullable;
+
+
+public interface ColumnTransformer {
+
+  /// Returns `true` if the transformer is no-op (can be skipped), `false` 
otherwise.
+  boolean isNoOp();
+
+  /// Transforms a value based on some custom rules.
+  /// @param value The original value.
+  /// @return The transformed value.
+  Object transform(@Nullable Object value);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to