This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 09bc151104e Add single column transformer (#17304)
09bc151104e is described below
commit 09bc151104e034b42d1c30be4185e2f6ef0ebecb
Author: Krishan Goyal <[email protected]>
AuthorDate: Mon Dec 29 14:46:14 2025 +0530
Add single column transformer (#17304)
* Add column transformer
* Handle nulls and transformers for primitive types
* Handle empty arrays and null transformations more appropriately
* Add tests for column reader transformer
* checkstyle fixes
* Remove column reader transforer
* Add logs in data type column transformer
* Fix checkstyle violations
---
.../DataTypeColumnTransformer.java | 101 ++++
.../NullValueColumnTransformer.java | 59 +++
.../recordtransformer/DataTypeTransformer.java | 132 +----
.../recordtransformer/NullValueTransformer.java | 54 +-
.../local/utils/DataTypeTransformerUtils.java | 180 +++++++
.../local/utils/NullValueTransformerUtils.java | 123 +++++
.../DataTypeColumnTransformerTest.java | 543 +++++++++++++++++++++
.../NullValueColumnTransformerTest.java | 444 +++++++++++++++++
.../recordtransformer/DataTypeTransformerTest.java | 69 +--
.../spi/columntransformer/ColumnTransformer.java | 33 ++
10 files changed, 1527 insertions(+), 211 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
new file mode 100644
index 00000000000..38f8270db9f
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+ private final PinotDataType _destDataType;
+ private final ColumnReader _columnReader;
+ private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
+
+ /**
+ * @param fieldSpec - The field spec for the column being created in Pinot.
+ * @param columnReader - The column reader to read the source data.
+ */
+ public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec
fieldSpec, ColumnReader columnReader) {
+ _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+ _columnReader = columnReader;
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ _continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+ }
+
+ @Override
+ public boolean isNoOp() {
+ // If source and destination data types are primitive types and the same,
no transformation is needed.
+ if (_columnReader.isSingleValue()) {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING);
+ }
+ } else {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG_ARRAY);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING_ARRAY);
+ }
+ }
+ // For other types, because there is no overhead to cast to Object, always
call transform() which handles all cases
+ return false;
+ }
+
+ @Override
+ public Object transform(Object value) {
+ String columnName = _columnReader.getColumnName();
+ try {
+ return DataTypeTransformerUtils.transformValue(columnName, value,
_destDataType);
+ } catch (Exception e) {
+ if (!_continueOnError) {
+ throw new RuntimeException("Caught exception while transforming data
type for column: " + columnName
+ + " to data type: " + _destDataType, e);
+ }
+ _throttledLogger.warn("Caught exception while transforming data type for
column: " + columnName
+ + " to data type: " + _destDataType + ". Returning null. Exception:
{}", e);
+ return null;
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
new file mode 100644
index 00000000000..6cde69908b3
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.segment.local.utils.NullValueTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class NullValueColumnTransformer implements ColumnTransformer {
+
+ private final Object _defaultNullValue;
+
+ /**
+ * @param tableConfig The table configuration
+ * @param fieldSpec The field specification for the column
+ * @param schema The schema (required for proper handling of time columns)
+ */
+ public NullValueColumnTransformer(TableConfig tableConfig, FieldSpec
fieldSpec, Schema schema) {
+ _defaultNullValue =
NullValueTransformerUtils.getDefaultNullValue(fieldSpec, tableConfig, schema);
+ }
+
+ @Override
+ public boolean isNoOp() {
+ return false;
+ }
+
+ @Override
+ public Object transform(Object value) {
+ if (value instanceof Object[] && ((Object[]) value).length == 0) {
+ // Special case: empty array should be treated as null
+ // TODO - Currently this is done in DataTypeTransformerUtils
+ // Should this be moved from DataTypeTransformerUtils to
NullValueTransformerUtils ?
+ // In Row major build, DataTypeTransformer is called always
+ // But in Column major build, DataTypeTransformer is not called if
source and destination data types are same
+ // If we move this logic to NullValueTransformerUtils, the logic stays
at one place
+ value = null;
+ }
+ return NullValueTransformerUtils.transformValue(value, _defaultNullValue);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
index 1c91136602e..9d8b6f7bbcc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java
@@ -18,18 +18,12 @@
*/
package org.apache.pinot.segment.local.recordtransformer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -75,50 +69,7 @@ public class DataTypeTransformer implements
RecordTransformer {
String column = entry.getKey();
try {
Object value = record.getValue(column);
- if (value == null) {
- continue;
- }
-
- PinotDataType dest = entry.getValue();
- if (dest != PinotDataType.JSON && dest != PinotDataType.MAP) {
- value = standardize(column, value, dest.isSingleValue());
- }
-
- // NOTE: The standardized value could be null for empty
Collection/Map/Object[].
- if (value == null) {
- record.putValue(column, null);
- continue;
- }
-
- // Convert data type if necessary
- PinotDataType source;
- if (value instanceof Object[]) {
- // Multi-value column
- Object[] values = (Object[]) value;
- // JSON is not standardised for empty json array
- if (dest == PinotDataType.JSON && values.length == 0) {
- source = PinotDataType.JSON;
- } else {
- source = PinotDataType.getMultiValueType(values[0].getClass());
- }
- } else {
- // Single-value column
- source = PinotDataType.getSingleValueType(value.getClass());
- }
-
- // Skipping conversion when srcType!=destType is speculative, and can
be unsafe when
- // the array for MV column contains values of mixing types. Mixing
types can lead
- // to ClassCastException during conversion, often aborting data
ingestion jobs.
- //
- // So now, calling convert() unconditionally for safety. Perf impact
is negligible:
- // 1. for SV column, when srcType=destType, the conversion is simply
pass through.
- // 2. for MV column, when srcType=destType, the conversion is simply
pass through
- // if the source type is not Object[] (but sth like Integer[],
Double[]). For Object[],
- // the conversion loops through values in the array like before, but
can catch the
- // ClassCastException if it happens and continue the conversion now.
- value = dest.convert(value, source);
- value = dest.toInternal(value);
-
+ value = DataTypeTransformerUtils.transformValue(column, value,
entry.getValue());
record.putValue(column, value);
} catch (Exception e) {
if (!_continueOnError) {
@@ -130,83 +81,4 @@ public class DataTypeTransformer implements
RecordTransformer {
}
}
}
-
- /**
- * Standardize the value into supported types.
- * <ul>
- * <li>Empty Collection/Map/Object[] will be standardized to null</li>
- * <li>Single-entry Collection/Map/Object[] will be standardized to single
value (map key is ignored)</li>
- * <li>Multi-entries Collection/Map/Object[] will be standardized to
Object[] (map key is ignored)</li>
- * </ul>
- */
- @VisibleForTesting
- @Nullable
- static Object standardize(String column, @Nullable Object value, boolean
isSingleValue) {
- if (value == null) {
- return null;
- }
- if (value instanceof Collection) {
- return standardizeCollection(column, (Collection) value, isSingleValue);
- }
- if (value instanceof Map) {
- return standardizeCollection(column, ((Map) value).values(),
isSingleValue);
- }
- if (value instanceof Object[]) {
- Object[] values = (Object[]) value;
- int numValues = values.length;
- if (numValues == 0) {
- return null;
- }
- if (numValues == 1) {
- return standardize(column, values[0], isSingleValue);
- }
- List<Object> standardizedValues = new ArrayList<>(numValues);
- for (Object singleValue : values) {
- Object standardizedValue = standardize(column, singleValue, true);
- if (standardizedValue != null) {
- standardizedValues.add(standardizedValue);
- }
- }
- int numStandardizedValues = standardizedValues.size();
- if (numStandardizedValues == 0) {
- return null;
- }
- if (numStandardizedValues == 1) {
- return standardizedValues.get(0);
- }
- if (isSingleValue) {
- throw new IllegalArgumentException(
- "Cannot read single-value from Object[]: " +
Arrays.toString(values) + " for column: " + column);
- }
- return standardizedValues.toArray();
- }
- return value;
- }
-
- private static Object standardizeCollection(String column, Collection
collection, boolean isSingleValue) {
- int numValues = collection.size();
- if (numValues == 0) {
- return null;
- }
- if (numValues == 1) {
- return standardize(column, collection.iterator().next(), isSingleValue);
- }
- List<Object> standardizedValues = new ArrayList<>(numValues);
- for (Object singleValue : collection) {
- Object standardizedValue = standardize(column, singleValue, true);
- if (standardizedValue != null) {
- standardizedValues.add(standardizedValue);
- }
- }
- int numStandardizedValues = standardizedValues.size();
- if (numStandardizedValues == 0) {
- return null;
- }
- if (numStandardizedValues == 1) {
- return standardizedValues.get(0);
- }
- Preconditions.checkState(!isSingleValue, "Cannot read single-value from
Collection: %s for column: %s", collection,
- column);
- return standardizedValues.toArray();
- }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
index 43b78ca6525..6427fb25b26 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/NullValueTransformer.java
@@ -18,67 +18,26 @@
*/
package org.apache.pinot.segment.local.recordtransformer;
-import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.utils.NullValueTransformerUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class NullValueTransformer implements RecordTransformer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(NullValueTransformer.class);
private final Map<String, Object> _defaultNullValues = new HashMap<>();
public NullValueTransformer(TableConfig tableConfig, Schema schema) {
- String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
-
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn() &&
!fieldSpec.getName().equals(timeColumnName)) {
+ if (!fieldSpec.isVirtualColumn()) {
String fieldName = fieldSpec.getName();
- Object defaultNullValue = fieldSpec.getDefaultNullValue();
- if (fieldSpec.isSingleValueField()) {
- _defaultNullValues.put(fieldName, defaultNullValue);
- } else {
- _defaultNullValues.put(fieldName, new Object[]{defaultNullValue});
- }
- }
- }
-
- // NOTE: Time column is used to manage the segments, so its values have to
be within the valid range. If the default
- // time value in the field spec is within the valid range, we use it
as the default time value; if not, we use
- // current time as the default time value.
- if (StringUtils.isNotEmpty(timeColumnName)) {
- DateTimeFieldSpec timeColumnSpec =
schema.getSpecForTimeColumn(timeColumnName);
- Preconditions.checkState(timeColumnSpec != null, "Failed to find time
field: %s from schema: %s", timeColumnName,
- schema.getSchemaName());
-
- String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
- DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
- try {
- long defaultTimeMs =
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
- if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
- _defaultNullValues.put(timeColumnName,
timeColumnSpec.getDefaultNullValue());
- return;
- }
- } catch (Exception e) {
- // Ignore
+ Object defaultNullValue =
NullValueTransformerUtils.getDefaultNullValue(fieldSpec, tableConfig, schema);
+ _defaultNullValues.put(fieldName, defaultNullValue);
}
- String currentTimeString =
dateTimeFormatSpec.fromMillisToFormat(System.currentTimeMillis());
- Object currentTime =
timeColumnSpec.getDataType().convert(currentTimeString);
- _defaultNullValues.put(timeColumnName, currentTime);
- LOGGER.info(
- "Default time: {} does not comply with format: {}, using current
time: {} as the default time for table: {}",
- defaultTimeString, timeColumnSpec.getFormat(), currentTime,
tableConfig.getTableName());
}
}
@@ -87,8 +46,9 @@ public class NullValueTransformer implements
RecordTransformer {
for (Map.Entry<String, Object> entry : _defaultNullValues.entrySet()) {
String fieldName = entry.getKey();
Object value = record.getValue(fieldName);
- if (value == null) {
- record.putDefaultNullValue(fieldName, entry.getValue());
+ Object transformedValue =
NullValueTransformerUtils.transformValue(value, entry.getValue());
+ if (transformedValue != value) {
+ record.putDefaultNullValue(fieldName, transformedValue);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
new file mode 100644
index 00000000000..017f2d274e7
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/DataTypeTransformerUtils.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.PinotDataType;
+
+
+/**
+ * Utility class for data type transformation operations shared between
+ * DataTypeTransformer (record-level) and DataTypeColumnTransformer
(column-level).
+ */
+@SuppressWarnings("rawtypes")
+public class DataTypeTransformerUtils {
+
+ private DataTypeTransformerUtils() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Transforms a value to the destination data type.
+ * This method standardizes the value, determines the source type, converts
to the destination type,
+ * and converts to internal representation.
+ *
+ * @param column The column name (used for error messages)
+ * @param value The value to transform
+ * @param destDataType The destination PinotDataType
+ * @return The transformed value, or null if the standardized value is null
+ */
+ @Nullable
+ public static Object transformValue(String column, @Nullable Object value,
PinotDataType destDataType) {
+ if (value == null) {
+ return null;
+ }
+
+ // Standardize the value (except for JSON and MAP types)
+ if (destDataType != PinotDataType.JSON && destDataType !=
PinotDataType.MAP) {
+ value = standardize(column, value, destDataType.isSingleValue());
+ }
+
+ // NOTE: The standardized value could be null for empty
Collection/Map/Object[].
+ if (value == null) {
+ return null;
+ }
+
+ // Determine the source data type
+ PinotDataType sourceDataType;
+ if (value instanceof Object[]) {
+ // Multi-value column
+ Object[] values = (Object[]) value;
+ // JSON is not standardised for empty json array
+ if (destDataType == PinotDataType.JSON && values.length == 0) {
+ sourceDataType = PinotDataType.JSON;
+ } else {
+ sourceDataType = PinotDataType.getMultiValueType(values[0].getClass());
+ }
+ } else {
+ // Single-value column
+ sourceDataType = PinotDataType.getSingleValueType(value.getClass());
+ }
+
+ // Convert from source to destination type
+ // Skipping conversion when srcType!=destType is speculative, and can be
unsafe when
+ // the array for MV column contains values of mixing types. Mixing types
can lead
+ // to ClassCastException during conversion, often aborting data ingestion
jobs.
+ //
+ // So now, calling convert() unconditionally for safety. Perf impact is
negligible:
+ // 1. for SV column, when srcType=destType, the conversion is simply pass
through.
+ // 2. for MV column, when srcType=destType, the conversion is simply pass
through
+ // if the source type is not Object[] (but sth like Integer[], Double[]).
For Object[],
+ // the conversion loops through values in the array like before, but can
catch the
+ // ClassCastException if it happens and continue the conversion now.
+ value = destDataType.convert(value, sourceDataType);
+ value = destDataType.toInternal(value);
+
+ return value;
+ }
+
+ /**
+ * Standardize the value into supported types.
+ * <ul>
+ * <li>Empty Collection/Map/Object[] will be standardized to null</li>
+ * <li>Single-entry Collection/Map/Object[] will be standardized to single
value (map key is ignored)</li>
+ * <li>Multi-entries Collection/Map/Object[] will be standardized to
Object[] (map key is ignored)</li>
+ * </ul>
+ */
+ @VisibleForTesting
+ @Nullable
+ public static Object standardize(String column, @Nullable Object value,
boolean isSingleValue) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Collection) {
+ return standardizeCollection(column, (Collection) value, isSingleValue);
+ }
+ if (value instanceof Map) {
+ return standardizeCollection(column, ((Map) value).values(),
isSingleValue);
+ }
+ if (value instanceof Object[]) {
+ Object[] values = (Object[]) value;
+ int numValues = values.length;
+ if (numValues == 0) {
+ return null;
+ }
+ if (numValues == 1) {
+ return standardize(column, values[0], isSingleValue);
+ }
+ List<Object> standardizedValues = new ArrayList<>(numValues);
+ for (Object singleValue : values) {
+ Object standardizedValue = standardize(column, singleValue, true);
+ if (standardizedValue != null) {
+ standardizedValues.add(standardizedValue);
+ }
+ }
+ int numStandardizedValues = standardizedValues.size();
+ if (numStandardizedValues == 0) {
+ return null;
+ }
+ if (numStandardizedValues == 1) {
+ return standardizedValues.get(0);
+ }
+ if (isSingleValue) {
+ throw new IllegalArgumentException(
+ "Cannot read single-value from Object[]: " +
Arrays.toString(values) + " for column: " + column);
+ }
+ return standardizedValues.toArray();
+ }
+ return value;
+ }
+
+ private static Object standardizeCollection(String column, Collection
collection, boolean isSingleValue) {
+ int numValues = collection.size();
+ if (numValues == 0) {
+ return null;
+ }
+ if (numValues == 1) {
+ return standardize(column, collection.iterator().next(), isSingleValue);
+ }
+ List<Object> standardizedValues = new ArrayList<>(numValues);
+ for (Object singleValue : collection) {
+ Object standardizedValue = standardize(column, singleValue, true);
+ if (standardizedValue != null) {
+ standardizedValues.add(standardizedValue);
+ }
+ }
+ int numStandardizedValues = standardizedValues.size();
+ if (numStandardizedValues == 0) {
+ return null;
+ }
+ if (numStandardizedValues == 1) {
+ return standardizedValues.get(0);
+ }
+ Preconditions.checkState(!isSingleValue, "Cannot read single-value from
Collection: %s for column: %s", collection,
+ column);
+ return standardizedValues.toArray();
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
new file mode 100644
index 00000000000..85d68f45a38
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/NullValueTransformerUtils.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for null value transformation operations shared between
+ * NullValueTransformer (record-level) and NullValueColumnTransformer
(column-level).
+ */
+public class NullValueTransformerUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NullValueTransformerUtils.class);
+
+ private NullValueTransformerUtils() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Computes the default null value for a given field spec.
+ * For time columns, validates that the default time is within valid range,
+ * otherwise uses current time.
+ *
+ * @param fieldSpec The field specification
+ * @param tableConfig The table configuration
+ * @param schema The schema
+ * @return The default null value to use for this field
+ */
+ @Nullable
+ public static Object getDefaultNullValue(FieldSpec fieldSpec, TableConfig
tableConfig, Schema schema) {
+ String fieldName = fieldSpec.getName();
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+
+ // Handle time column specially
+ if (StringUtils.isNotEmpty(timeColumnName) &&
fieldName.equals(timeColumnName)) {
+ return getDefaultTimeValue(timeColumnName, tableConfig, schema);
+ }
+
+ // For non-time columns, use the field spec's default null value
+ Object defaultNullValue = fieldSpec.getDefaultNullValue();
+ if (fieldSpec.isSingleValueField()) {
+ return defaultNullValue;
+ } else {
+ return new Object[]{defaultNullValue};
+ }
+ }
+
+ /**
+ * Computes the default time value for the time column.
+ * Time column is used to manage the segments, so its values have to be
within the valid range. If the default
+ * time value in the field spec is within the valid range, we use it as the
default time value; if not, we use
+ * current time as the default time value.
+ *
+ * @param timeColumnName The name of the time column
+ * @param tableConfig The table configuration
+ * @param schema The schema
+ * @return The default time value to use
+ */
+ private static Object getDefaultTimeValue(String timeColumnName, TableConfig
tableConfig, Schema schema) {
+ DateTimeFieldSpec timeColumnSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkState(timeColumnSpec != null, "Failed to find time
field: %s from schema: %s", timeColumnName,
+ schema.getSchemaName());
+
+ String defaultTimeString = timeColumnSpec.getDefaultNullValueString();
+ DateTimeFormatSpec dateTimeFormatSpec = timeColumnSpec.getFormatSpec();
+
+ // Try to use the default time from the field spec if it's valid
+ try {
+ long defaultTimeMs =
dateTimeFormatSpec.fromFormatToMillis(defaultTimeString);
+ if (TimeUtils.timeValueInValidRange(defaultTimeMs)) {
+ return timeColumnSpec.getDefaultNullValue();
+ }
+ } catch (Exception e) {
+ // Ignore and fall through to use current time
+ }
+
+ // Use current time if default time is not valid
+ String currentTimeString =
dateTimeFormatSpec.fromMillisToFormat(System.currentTimeMillis());
+ Object currentTime =
timeColumnSpec.getDataType().convert(currentTimeString);
+ LOGGER.info(
+ "Default time: {} does not comply with format: {}, using current time:
{} as the default time for table: {}",
+ defaultTimeString, timeColumnSpec.getFormat(), currentTime,
tableConfig.getTableName());
+ return currentTime;
+ }
+
+ /**
+ * Transforms a value by replacing null with the default null value.
+ *
+ * @param value The value to transform
+ * @param defaultNullValue The default null value to use if value is null
+ * @return The original value if not null, otherwise the default null value
+ */
+ @Nullable
+ public static Object transformValue(@Nullable Object value, Object
defaultNullValue) {
+ return value != null ? value : defaultNullValue;
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
new file mode 100644
index 00000000000..c52bd254b5a
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformerTest.java
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/**
+ * Comprehensive tests for DataTypeColumnTransformer.
+ * Tests data type conversions, isNoOp optimization, and error handling.
+ */
+public class DataTypeColumnTransformerTest {
+
+ private static final String COLUMN_NAME = "testColumn";
+
+ private ColumnReader createMockColumnReader(String columnName, boolean
isSingleValue,
+ boolean isInt, boolean isLong, boolean isFloat, boolean isDouble,
boolean isString, boolean isBytes) {
+ ColumnReader reader = Mockito.mock(ColumnReader.class);
+ when(reader.getColumnName()).thenReturn(columnName);
+ when(reader.isSingleValue()).thenReturn(isSingleValue);
+ when(reader.isInt()).thenReturn(isInt);
+ when(reader.isLong()).thenReturn(isLong);
+ when(reader.isFloat()).thenReturn(isFloat);
+ when(reader.isDouble()).thenReturn(isDouble);
+ when(reader.isString()).thenReturn(isString);
+ when(reader.isBytes()).thenReturn(isBytes);
+ return reader;
+ }
+
+ @Test
+ public void testIsNoOpForMatchingIntTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both INT");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingLongTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
true, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both LONG");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingFloatTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, true, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both FLOAT");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingDoubleTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, true, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both DOUBLE");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingStringTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both STRING");
+ }
+
+ @Test
+ public void testIsNotNoOpForDifferentTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertFalse(transformer.isNoOp(), "Should not be no-op when converting INT
to LONG");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingIntMVTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both INT[]");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingLongMVTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false,
true, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both LONG[]");
+ }
+
+ @Test
+ public void testIsNoOpForMatchingStringMVTypes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertTrue(transformer.isNoOp(), "Should be no-op when source and dest are
both STRING[]");
+ }
+
+ @Test
+ public void testTransformNullValue() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(null);
+ assertNull(result, "Null values should pass through as null");
+ }
+
+ @Test
+ public void testTransformStringToInt() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform("42");
+ assertEquals(result, 42);
+ }
+
+ @Test
+ public void testTransformStringToLong() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform("1234567890");
+ assertEquals(result, 1234567890L);
+ }
+
+ @Test
+ public void testTransformStringToFloat() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform("3.14");
+ assertEquals(result, 3.14f);
+ }
+
+ @Test
+ public void testTransformStringToDouble() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform("3.14159");
+ assertEquals(result, 3.14159);
+ }
+
+ @Test
+ public void testTransformIntToLong() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(42);
+ assertEquals(result, 42L);
+ }
+
+ @Test
+ public void testTransformIntToString() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(42);
+ assertEquals(result, "42");
+ }
+
+ @Test
+ public void testTransformLongToTimestamp() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.TIMESTAMP)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
true, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ long timestampValue = 1609459200000L; // 2021-01-01 00:00:00 UTC
+ Object result = transformer.transform(timestampValue);
+ assertEquals(result, timestampValue);
+ }
+
+ @Test
+ public void testInvalidConversionWithContinueOnErrorFalse() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+ // Default table config has continueOnError = false
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Try to convert invalid string to int
+ expectThrows(RuntimeException.class, () ->
transformer.transform("not_a_number"));
+ }
+
+ @Test
+ public void testInvalidConversionWithContinueOnErrorTrue() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+ // Set continueOnError = true
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setContinueOnError(true);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .setIngestionConfig(ingestionConfig)
+ .build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Try to convert invalid string to int - should return null
+ Object result = transformer.transform("not_a_number");
+ assertNull(result, "Invalid conversion should return null when
continueOnError=true");
+ }
+
+ @Test
+ public void testTransformStringArrayToIntArray() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, false, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(new Object[]{"1", "2", "3"});
+ Object[] intArray = (Object[]) result;
+ assertEquals(intArray.length, 3);
+ assertEquals(intArray[0], 1);
+ assertEquals(intArray[1], 2);
+ assertEquals(intArray[2], 3);
+ }
+
+ @Test
+ public void testTransformEmptyArray() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Empty arrays should be standardized to null
+ Object result = transformer.transform(new Object[0]);
+ assertNull(result);
+ }
+
+ @Test
+ public void testTransformSingleElementArray() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Single element arrays should be unwrapped
+ Object result = transformer.transform(new Object[]{"42"});
+ assertEquals(result, 42);
+ }
+
+ @Test
+ public void testMVToSVConversionError() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Try to convert multi-value array to single-value
+ expectThrows(RuntimeException.class, () -> transformer.transform(new
Object[]{"1", "2", "3"}));
+ }
+
+ @Test
+ public void testTransformBooleanToString() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(true);
+ assertEquals(result, "true");
+ }
+
+ @Test
+ public void testTransformStringToBoolean() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.BOOLEAN)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, true, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform("true");
+ assertEquals(result, 1); // Boolean true is stored as 1
+
+ result = transformer.transform("false");
+ assertEquals(result, 0); // Boolean false is stored as 0
+ }
+
+ @Test
+ public void testTransformFloatToDouble() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.DOUBLE)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, true, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(3.14f);
+ assertTrue(result instanceof Double);
+ assertEquals((Double) result, 3.14, 0.001);
+ }
+
+ @Test
+ public void testTransformDoubleToFloat() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.FLOAT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, true, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(3.14159);
+ assertTrue(result instanceof Float);
+ assertEquals((Float) result, 3.14159f, 0.001f);
+ }
+
+ @Test
+ public void testTransformBytesToString() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, false, true);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ byte[] bytes = "test".getBytes();
+ Object result = transformer.transform(bytes);
+ assertTrue(result instanceof String);
+ }
+
+ @Test
+ public void testTransformPreservesValue() {
+ // When source and dest types match (no-op case), value should still be
transformed
+ // to ensure any internal representation conversion happens
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, true,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ Object result = transformer.transform(42);
+ assertEquals(result, 42);
+ }
+
+ @Test
+ public void testIsNotNoOpForBytes() {
+ // Bytes type should not be no-op since there's overhead and special
handling
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.BYTES)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, false, true);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ // Bytes type uses generic transformation path
+ assertFalse(transformer.isNoOp());
+ }
+
+ @Test
+ public void testIsNotNoOpForJson() {
+ // JSON type should not be no-op
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.JSON)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(COLUMN_NAME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ ColumnReader reader = createMockColumnReader(COLUMN_NAME, true, false,
false, false, false, false, false);
+ DataTypeColumnTransformer transformer = new
DataTypeColumnTransformer(tableConfig, fieldSpec, reader);
+
+ assertFalse(transformer.isNoOp());
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
new file mode 100644
index 00000000000..eb0b0bed223
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/columntransformer/NullValueColumnTransformerTest.java
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Comprehensive tests for NullValueColumnTransformer.
+ * Tests null value handling and default value substitution for all data types.
+ */
+public class NullValueColumnTransformerTest {
+
+ private static final TableConfig TABLE_CONFIG =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+
+ @Test
+ public void testIsNoOp() {
+ // NullValueColumnTransformer should never be a no-op since it always
needs to handle null values
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ assertFalse(transformer.isNoOp(), "NullValueColumnTransformer should never
be a no-op");
+ }
+
+ @Test
+ public void testTransformNullToDefaultInt() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ assertEquals(result, Integer.MIN_VALUE);
+ }
+
+ @Test
+ public void testTransformNullToDefaultLong() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("longCol", FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("longCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ assertEquals(result, Long.MIN_VALUE);
+ }
+
+ @Test
+ public void testTransformNullToDefaultFloat() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("floatCol", FieldSpec.DataType.FLOAT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("floatCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ assertEquals(result, Float.NEGATIVE_INFINITY);
+ }
+
+ @Test
+ public void testTransformNullToDefaultDouble() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("doubleCol", FieldSpec.DataType.DOUBLE)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("doubleCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ assertEquals(result, Double.NEGATIVE_INFINITY);
+ }
+
+ @Test
+ public void testTransformNullToDefaultString() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ assertEquals(result, "null");
+ }
+
+ @Test
+ public void testTransformNullToDefaultBytes() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("bytesCol", FieldSpec.DataType.BYTES)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("bytesCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof byte[]);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ }
+
+ @Test
+ public void testTransformNullToDefaultBoolean() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("boolCol", FieldSpec.DataType.BOOLEAN)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("boolCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ }
+
+ @Test
+ public void testTransformNullToDefaultTimestamp() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("timestampCol", FieldSpec.DataType.TIMESTAMP)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("timestampCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, fieldSpec.getDefaultNullValue());
+ }
+
+ @Test
+ public void testTransformNullToDefaultIntMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("intMVCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], Integer.MIN_VALUE);
+ }
+
+ @Test
+ public void testTransformNullToDefaultLongMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("longMVCol", FieldSpec.DataType.LONG)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("longMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], Long.MIN_VALUE);
+ }
+
+ @Test
+ public void testTransformNullToDefaultFloatMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("floatMVCol", FieldSpec.DataType.FLOAT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("floatMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], Float.NEGATIVE_INFINITY);
+ }
+
+ @Test
+ public void testTransformNullToDefaultDoubleMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("doubleMVCol", FieldSpec.DataType.DOUBLE)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("doubleMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], Double.NEGATIVE_INFINITY);
+ }
+
+ @Test
+ public void testTransformNullToDefaultStringMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("stringMVCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], "null");
+ }
+
+ @Test
+ public void testTransformEmptyObjectArrayToNull() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+ // Empty Object[] should be treated as null
+ Object result = transformer.transform(new Object[0]);
+
+ assertNotNull(result);
+ assertEquals(result, "null");
+ }
+
+ @Test
+ public void testTransformEmptyObjectArrayToNullForMV() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("stringMVCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+ // Empty Object[] should be treated as null
+ Object result = transformer.transform(new Object[0]);
+
+ assertNotNull(result);
+ assertTrue(result instanceof Object[]);
+ Object[] resultArray = (Object[]) result;
+ assertEquals(resultArray.length, 1);
+ assertEquals(resultArray[0], "null");
+ }
+
+ @Test
+ public void testNonNullValuePassThrough() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+ Integer testValue = 42;
+ Object result = transformer.transform(testValue);
+
+ assertEquals(result, testValue);
+ }
+
+ @Test
+ public void testNonNullStringPassThrough() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+ String testValue = "test string";
+ Object result = transformer.transform(testValue);
+
+ assertEquals(result, testValue);
+ }
+
+ @Test
+ public void testNonNullMVPassThrough() {
+ Schema schema = new Schema.SchemaBuilder()
+ .addMultiValueDimension("intMVCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intMVCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+
+ int[] testValue = new int[]{1, 2, 3};
+ Object result = transformer.transform(testValue);
+
+ assertEquals(result, testValue);
+ }
+
+ @Test
+ public void testTimeColumnWithValidDefault() {
+ // Create a time column with a valid default value
+ Schema schema = new Schema.SchemaBuilder()
+ .addDateTime("timeCol", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .setTimeColumnName("timeCol")
+ .build();
+
+ FieldSpec fieldSpec = schema.getFieldSpecFor("timeCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(tableConfig, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ // Should use the field spec's default value since it's valid
+ assertTrue(result instanceof Long);
+ }
+
+ @Test
+ public void testTimeColumnWithInvalidDefault() {
+ // Create a time column with an invalid default value (far future)
+ Schema schema = new Schema.SchemaBuilder()
+ .addDateTimeField("timeCol", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .setTimeColumnName("timeCol")
+ .build();
+
+ FieldSpec fieldSpec = schema.getFieldSpecFor("timeCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(tableConfig, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ // Should use current time instead of the invalid default
+ assertTrue(result instanceof Long);
+ long resultTime = (Long) result;
+ long currentTime = System.currentTimeMillis();
+ // Result should be close to current time (within 1 second)
+ assertTrue(Math.abs(resultTime - currentTime) < 1000,
+ "Time should be close to current time, got: " + resultTime + ",
current: " + currentTime);
+ }
+
+ @Test
+ public void testCustomDefaultNullValue() {
+ // Test with a field that has a custom default null value
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("customIntCol", FieldSpec.DataType.INT, 999)
// Custom default
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("customIntCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, 999);
+ }
+
+ @Test
+ public void testCustomDefaultNullValueString() {
+ // Test with a string field that has a custom default null value
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("customStringCol", FieldSpec.DataType.STRING,
"MISSING") // Custom default
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("customStringCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(null);
+
+ assertNotNull(result);
+ assertEquals(result, "MISSING");
+ }
+
+ @Test
+ public void testZeroValue() {
+ // Zero should not be treated as null
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("intCol", FieldSpec.DataType.INT)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("intCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform(0);
+
+ assertEquals(result, 0);
+ }
+
+ @Test
+ public void testEmptyString() {
+ // Empty string should not be treated as null
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
+ .build();
+ FieldSpec fieldSpec = schema.getFieldSpecFor("stringCol");
+
+ NullValueColumnTransformer transformer = new
NullValueColumnTransformer(TABLE_CONFIG, fieldSpec, schema);
+ Object result = transformer.transform("");
+
+ assertEquals(result, "");
+ }
+}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
index af41e22b3bb..ed9fae8a1fa 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -42,14 +43,14 @@ public class DataTypeTransformerTest {
// Empty Map
Map<String, Object> map = Collections.emptyMap();
- assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, true));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, false));
// Map with single entry
String expectedValue = "testValue";
map = Collections.singletonMap("testKey", expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true),
expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false),
expectedValue);
// Map with multiple entries
Object[] expectedValues = new Object[]{"testValue1", "testValue2"};
@@ -58,12 +59,12 @@ public class DataTypeTransformerTest {
map.put("testKey2", "testValue2");
try {
// Should fail because Map with multiple entries cannot be standardized
as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformerUtils.standardize(COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN,
map, false), expectedValues);
+ assertEqualsNoOrder((Object[])
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
/**
* Tests for List
@@ -71,24 +72,24 @@ public class DataTypeTransformerTest {
// Empty List
List<Object> list = Collections.emptyList();
- assertNull(DataTypeTransformer.standardize(COLUMN, list, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, list, false));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, list, true));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, list, false));
// List with single entry
list = Collections.singletonList(expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, list, true),
expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, list, false),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, list, true),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, list, false),
expectedValue);
// List with multiple entries
list = Arrays.asList(expectedValues);
try {
// Should fail because List with multiple entries cannot be standardized
as single value
- DataTypeTransformer.standardize(COLUMN, list, true);
+ DataTypeTransformerUtils.standardize(COLUMN, list, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list,
false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN, list,
false), expectedValues);
/**
* Tests for Object[]
@@ -96,24 +97,24 @@ public class DataTypeTransformerTest {
// Empty Object[]
Object[] values = new Object[0];
- assertNull(DataTypeTransformer.standardize(COLUMN, values, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, values, false));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, values, true));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, values, false));
// Object[] with single entry
values = new Object[]{expectedValue};
- assertEquals(DataTypeTransformer.standardize(COLUMN, values, true),
expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, values, false),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, values, true),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, values, false),
expectedValue);
// Object[] with multiple entries
values = new Object[]{"testValue1", "testValue2"};
try {
// Should fail because Object[] with multiple entries cannot be
standardized as single value
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformerUtils.standardize(COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values,
false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN,
values, false), expectedValues);
/**
* Tests for nested Map/List/Object[]
@@ -121,32 +122,32 @@ public class DataTypeTransformerTest {
// Map with empty List
map = Collections.singletonMap("testKey", Collections.emptyList());
- assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, true));
+ assertNull(DataTypeTransformerUtils.standardize(COLUMN, map, false));
// Map with single-entry List
map = Collections.singletonMap("testKey",
Collections.singletonList(expectedValue));
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true),
expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false),
expectedValue);
// Map with one empty Map and one single-entry Map
map = new HashMap<>();
map.put("testKey1", Collections.emptyMap());
map.put("testKey2", Collections.singletonMap("testKey", expectedValue));
// Can be standardized into single value because empty Map should be
ignored
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true),
expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, true),
expectedValue);
+ assertEquals(DataTypeTransformerUtils.standardize(COLUMN, map, false),
expectedValue);
// Map with multi-entries List
map = Collections.singletonMap("testKey", Arrays.asList(expectedValues));
try {
// Should fail because Map with multiple entries cannot be standardized
as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformerUtils.standardize(COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN,
map, false), expectedValues);
+ assertEqualsNoOrder((Object[])
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
// Map with one empty Map, one single-entry List and one single-entry
Object[]
map = new HashMap<>();
@@ -155,12 +156,12 @@ public class DataTypeTransformerTest {
map.put("testKey3", new Object[]{"testValue2"});
try {
// Should fail because Map with multiple entries cannot be standardized
as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformerUtils.standardize(COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN,
map, false), expectedValues);
+ assertEqualsNoOrder((Object[])
DataTypeTransformerUtils.standardize(COLUMN, map, false), expectedValues);
// List with two single-entry Maps and one empty Map
list = Arrays
@@ -168,12 +169,12 @@ public class DataTypeTransformerTest {
Collections.emptyMap());
try {
// Should fail because List with multiple entries cannot be standardized
as single value
- DataTypeTransformer.standardize(COLUMN, list, true);
+ DataTypeTransformerUtils.standardize(COLUMN, list, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list,
false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformerUtils.standardize(COLUMN, list,
false), expectedValues);
// Object[] with two single-entry Maps
values = new Object[]{
@@ -181,12 +182,12 @@ public class DataTypeTransformerTest {
};
try {
// Should fail because Object[] with multiple entries cannot be
standardized as single value
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformerUtils.standardize(COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN,
values, false), expectedValues);
+ assertEqualsNoOrder((Object[])
DataTypeTransformerUtils.standardize(COLUMN, values, false), expectedValues);
// Object[] with one empty Object[], one multi-entries List of nested
Map/List/Object[]
values = new Object[]{
@@ -194,11 +195,11 @@ public class DataTypeTransformerTest {
Collections.singletonMap("testKey", Arrays.asList(new
Object[]{"testValue2"}, Collections.emptyMap()))
};
try {
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformerUtils.standardize(COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN,
values, false), expectedValues);
+ assertEqualsNoOrder((Object[])
DataTypeTransformerUtils.standardize(COLUMN, values, false), expectedValues);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
new file mode 100644
index 00000000000..6ebc9864788
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/columntransformer/ColumnTransformer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.columntransformer;
+
+import javax.annotation.Nullable;
+
+
+public interface ColumnTransformer {
+
+ /// Returns `true` if the transformer is no-op (can be skipped), `false`
otherwise.
+ boolean isNoOp();
+
+ /// Transforms a value based on some custom rules.
+ /// @param value The original value.
+ /// @return The transformed value.
+ Object transform(@Nullable Object value);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]