This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 95d4950dab166373ed9ec58555435852e5f2ffeb Author: Aishik <[email protected]> AuthorDate: Mon Nov 20 19:18:40 2023 +0530 Added new SpecialValueTransformer and tests. --- .../recordtransformer/CompositeTransformer.java | 11 ++- .../recordtransformer/SpecialValueTransformer.java | 96 ++++++++++++++++++++++ .../recordtransformer/RecordTransformerTest.java | 49 +++++++++++ 3 files changed, 153 insertions(+), 3 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index b6fb694306..50ca2a97c1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -68,13 +68,18 @@ public class CompositeTransformer implements RecordTransformer { * Optional {@link SanitizationTransformer} after {@link NullValueTransformer} so that before sanitation, all * values are non-null and follow the data types defined in the schema * </li> + * <li> + * {@link SpecialValueTransformer} after {@link DataTypeTransformer} so that we already have the values complying + * with the schema before handling special values + * </li> * </ul> */ public static List<RecordTransformer> getDefaultTransformers(TableConfig tableConfig, Schema schema) { return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), - new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), - new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema), - new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()).collect(Collectors.toList()); + new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), + new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema), + new SpecialValueTransformer(schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp()) + .collect(Collectors.toList()); } public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java new file mode 100644 index 0000000000..2bd5b9920d --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.recordtransformer; + +import java.util.HashSet; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** + * The {@code SpecialValueTransformer} class will transform special values the values to follow certain rules including: + * <ul> + * <li>Negative zero (-0.0) should be converted to 0.0</li> + * <li>NaN should be converted to default null</li> + * </ul> + * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all values follow the data types in + * {@link FieldSpec}. + */ +public class SpecialValueTransformer implements RecordTransformer { + private final HashSet<String> _specialValuesKeySet = new HashSet<>(); + + public SpecialValueTransformer(Schema schema) { + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + if (!fieldSpec.isVirtualColumn() && (fieldSpec.getDataType() == DataType.FLOAT + || fieldSpec.getDataType() == DataType.DOUBLE)) { + _specialValuesKeySet.add(fieldSpec.getName()); + } + } + } + + private Object transformNegativeZero(Object value) { + if ((value instanceof Float) && (Float.floatToRawIntBits((float) value) == Float.floatToRawIntBits(-0.0f))) { + value = 0.0f; + } else if ((value instanceof Double) && (Double.doubleToLongBits((double) value) == Double.doubleToLongBits( + -0.0d))) { + value = 0.0d; + } + return value; + } + + private Object transformNaN(Object value) { + if ((value instanceof Float) && ((Float) value).isNaN()) { + value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT; + } else if ((value instanceof Double) && ((Double) value).isNaN()) { + value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE; + } + return value; + } + + @Override + public boolean isNoOp() { + return _specialValuesKeySet.isEmpty(); + } + + @Override + public GenericRow transform(GenericRow record) { + for (String element : _specialValuesKeySet) { + Object value = record.getValue(element); + if (value instanceof Float || value instanceof Double) { + // Single-valued column. + Object zeroTransformedValue = transformNegativeZero(value); + Object nanTransformedValue = transformNaN(zeroTransformedValue); + if (nanTransformedValue != value) { + record.putValue(element, nanTransformedValue); + } + } else if (value instanceof Object[]) { + // Multi-valued column. + Object[] values = (Object[]) value; + int numValues = values.length; + for (int i = 0; i < numValues; i++) { + values[i] = transformNegativeZero(values[i]); + values[i] = transformNaN(values[i]); + } + } + } + return record; + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index baded1b97a..e48b6605f5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -51,6 +51,13 @@ public class RecordTransformerTest { .addSingleValueDimension("svStringWithNullCharacters", DataType.STRING) .addSingleValueDimension("svStringWithLengthLimit", DataType.STRING) .addMultiValueDimension("mvString1", DataType.STRING).addMultiValueDimension("mvString2", DataType.STRING) + // For negative zero and NaN conversions + .addSingleValueDimension("svFloatNegativeZero", DataType.FLOAT) + .addMultiValueDimension("mvFloatNegativeZero", DataType.FLOAT) + .addSingleValueDimension("svDoubleNegativeZero", DataType.DOUBLE) + .addMultiValueDimension("mvDoubleNegativeZero", DataType.DOUBLE) + .addSingleValueDimension("svFloatNaN", DataType.FLOAT).addMultiValueDimension("mvFloatNaN", DataType.FLOAT) + .addSingleValueDimension("svDoubleNaN", DataType.DOUBLE).addMultiValueDimension("mvDoubleNaN", DataType.DOUBLE) .build(); private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); @@ -82,6 +89,14 @@ public class RecordTransformerTest { record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0}); record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"}); record.putValue("svNullString", null); + record.putValue("svFloatNegativeZero", -0.00f); + record.putValue("svDoubleNegativeZero", -0.00d); + record.putValue("mvFloatNegativeZero", new Float[]{-0.0f, 1.0f, 0.0f, 3.0f}); + record.putValue("mvDoubleNegativeZero", new Double[]{-0.0d, 1.0d, 0.0d, 3.0d}); + record.putValue("svFloatNaN", Float.NaN); + record.putValue("svDoubleNaN", Double.NaN); + record.putValue("mvFloatNaN", new Float[]{-0.0f, Float.NaN, 2.0f}); + record.putValue("mvDoubleNaN", new Double[]{-0.0d, Double.NaN, 2.0d}); return record; } @@ -254,6 +269,28 @@ public class RecordTransformerTest { } } + @Test + public void testSpecialValueTransformer() { + RecordTransformer transformer = new SpecialValueTransformer(SCHEMA); + GenericRow record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")), + Float.floatToRawIntBits(0.0f)); + assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")), + Double.doubleToRawLongBits(0.0d)); + assertEquals(record.getValue("mvFloatNegativeZero"), new Float[]{0.0f, 1.0f, 0.0f, 3.0f}); + assertEquals(record.getValue("mvDoubleNegativeZero"), new Double[]{0.0d, 1.0d, 0.0d, 3.0d}); + assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT); + assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE); + assertEquals(record.getValue("mvFloatNaN"), + new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f}); + assertEquals(record.getValue("mvDoubleNaN"), + new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d}); + } + } + @Test public void testScalarOps() { IngestionConfig ingestionConfig = new IngestionConfig(); @@ -530,6 +567,18 @@ public class RecordTransformerTest { assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", "123.0", "123.0", "123"}); assertNull(record.getValue("$virtual")); assertTrue(record.getNullValueFields().isEmpty()); + assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")), + Float.floatToRawIntBits(0.0f)); + assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")), + Double.doubleToRawLongBits(0.0d)); + assertEquals(record.getValue("mvFloatNegativeZero"), new Float[]{0.0f, 1.0f, 0.0f, 3.0f}); + assertEquals(record.getValue("mvDoubleNegativeZero"), new Double[]{0.0d, 1.0d, 0.0d, 3.0d}); + assertEquals(record.getValue("svFloatNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT); + assertEquals(record.getValue("svDoubleNaN"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE); + assertEquals(record.getValue("mvFloatNaN"), + new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f}); + assertEquals(record.getValue("mvDoubleNaN"), + new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d}); } // Test empty record --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
