This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 2c05f8dca9db402de904c044264d54cc76858238 Author: Aishik <[email protected]> AuthorDate: Wed Nov 22 18:54:24 2023 +0530 addressed comments and added test for ensuring order of transformers. --- .../recordtransformer/SpecialValueTransformer.java | 28 ++++++--- .../recordtransformer/RecordTransformerTest.java | 72 +++++++++++++++++++++- 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java index 9e55c84cf1..4cfbbc8ce9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/SpecialValueTransformer.java @@ -23,6 +23,8 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -35,6 +37,8 @@ import org.apache.pinot.spi.data.readers.GenericRow; * {@link FieldSpec}. */ public class SpecialValueTransformer implements RecordTransformer { + + private static final Logger LOGGER = LoggerFactory.getLogger(NullValueTransformer.class); private final HashSet<String> _specialValuesKeySet = new HashSet<>(); public SpecialValueTransformer(Schema schema) { @@ -48,9 +52,11 @@ public class SpecialValueTransformer implements RecordTransformer { private Object transformNegativeZero(Object value) { if ((value instanceof Float) && (Float.floatToRawIntBits((float) value) == Float.floatToRawIntBits(-0.0f))) { + LOGGER.info("-0.0f value detected, converting to 0.0."); value = 0.0f; } else if ((value instanceof Double) && (Double.doubleToLongBits((double) value) == Double.doubleToLongBits( -0.0d))) { + LOGGER.info("-0.0d value detected, converting to 0.0."); value = 0.0d; } return value; @@ -58,8 +64,10 @@ public class SpecialValueTransformer implements RecordTransformer { private Object transformNaN(Object value) { if ((value instanceof Float) && ((Float) value).isNaN()) { + LOGGER.info("Float.NaN detected, converting to default null."); value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT; } else if ((value instanceof Double) && ((Double) value).isNaN()) { + LOGGER.info("Double.NaN detected, converting to default null."); value = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE; } return value; @@ -74,21 +82,23 @@ public class SpecialValueTransformer implements RecordTransformer { public GenericRow transform(GenericRow record) { for (String element : _specialValuesKeySet) { Object value = record.getValue(element); - if (value instanceof Float || value instanceof Double) { + if (value instanceof Object[]) { + // Multi-valued column. + Object[] values = (Object[]) value; + int numValues = values.length; + for (int i = 0; i < numValues; i++) { + if (values[i] != null) { + values[i] = transformNegativeZero(values[i]); + values[i] = transformNaN(values[i]); + } + } + } else { // Single-valued column. Object zeroTransformedValue = transformNegativeZero(value); Object nanTransformedValue = transformNaN(zeroTransformedValue); if (nanTransformedValue != value) { record.putValue(element, nanTransformedValue); } - } else if (value instanceof Object[]) { - // Multi-valued column. - Object[] values = (Object[]) value; - int numValues = values.length; - for (int i = 0; i < numValues; i++) { - values[i] = transformNegativeZero(values[i]); - values[i] = transformNaN(values[i]); - } } } return record; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java index e48b6605f5..6361963785 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java @@ -20,11 +20,14 @@ package org.apache.pinot.segment.local.recordtransformer; import java.sql.Timestamp; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig; +import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; @@ -69,6 +72,7 @@ public class RecordTransformerTest { // Transform multiple times should return the same result private static final int NUM_ROUNDS = 5; + private static final int NUMBER_OF_TRANSFORMERS = 8; private static GenericRow getRecord() { GenericRow record = new GenericRow(); @@ -291,6 +295,72 @@ public class RecordTransformerTest { } } + @Test + public void testOrderForTransformers() { + // This test checks that the specified order is maintained for different transformers. + + // Build Schema and ingestionConfig in such a way that all the transformers are loaded. + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.INT) + .addSingleValueDimension("svDouble", DataType.DOUBLE) + .addSingleValueDimension("expressionTestColumn", DataType.INT).addSingleValueDimension("svNaN", DataType.FLOAT) + .addSingleValueDimension("emptyDimensionForNullValueTransformer", DataType.FLOAT) + .addSingleValueDimension("svStringWithNullCharacters", DataType.STRING) + .addSingleValueDimension("indexableExtras", DataType.JSON) + .addDateTime("timeCol", DataType.TIMESTAMP, "1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS").build(); + + IngestionConfig ingestionConfig = new IngestionConfig(); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig) + .setTimeColumnName("timeCol").build(); + ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123 AND svDouble <= 200")); + ingestionConfig.setTransformConfigs(List.of(new TransformConfig("expressionTestColumn", "plus(x,10)"))); + ingestionConfig.setSchemaConformingTransformerConfig( + new SchemaConformingTransformerConfig("indexableExtras", null, null, null)); + ingestionConfig.setRowTimeValueCheck(true); + ingestionConfig.setContinueOnError(false); + + // Get the list of transformers. + List<RecordTransformer> currentListOfTransformers = + CompositeTransformer.getDefaultTransformers(tableConfig, schema); + + // Create a list of transformers to compare. + List<RecordTransformer> expectedListOfTransformers = + List.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig), + new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema), + new TimeValidationTransformer(tableConfig, schema), new NullValueTransformer(tableConfig, schema), + new SpecialValueTransformer(schema), new SanitizationTransformer(schema)); + + // Check that the number of current transformers match the expected number of transformers. + assertEquals(currentListOfTransformers.size(), NUMBER_OF_TRANSFORMERS); + + GenericRow record = new GenericRow(); + + // Data for expression Transformer. + record.putValue("expressionTestColumn", 100); + + // Data for filter transformer. + record.putValue("svDouble", 123d); + + // Data for DataType Transformer. + record.putValue("svInt", (byte) 123); + + // Data for TimeValidation transformer. + record.putValue("timeCol", System.currentTimeMillis()); + + // Data for SpecialValue Transformer. + record.putValue("svNaN", Float.NaN); + + // Data for sanitization transformer. + record.putValue("svStringWithNullCharacters", "1\0002\0003"); + + for (int i = 0; i < NUMBER_OF_TRANSFORMERS; i++) { + GenericRow currentRecord = currentListOfTransformers.get(i).transform(record); + GenericRow expectedRecord = expectedListOfTransformers.get(i).transform(record); + assertEquals(currentRecord, expectedRecord); + record = expectedRecord; + } + } + @Test public void testScalarOps() { IngestionConfig ingestionConfig = new IngestionConfig(); @@ -566,7 +636,6 @@ public class RecordTransformerTest { assertEquals(record.getValue("mvString1"), new Object[]{"123", "123", "123", "123.0", "123.0"}); assertEquals(record.getValue("mvString2"), new Object[]{"123", "123", "123.0", "123.0", "123"}); assertNull(record.getValue("$virtual")); - assertTrue(record.getNullValueFields().isEmpty()); assertEquals(Float.floatToRawIntBits((float) record.getValue("svFloatNegativeZero")), Float.floatToRawIntBits(0.0f)); assertEquals(Double.doubleToRawLongBits((double) record.getValue("svDoubleNegativeZero")), @@ -579,6 +648,7 @@ public class RecordTransformerTest { new Float[]{0.0f, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT, 2.0f}); assertEquals(record.getValue("mvDoubleNaN"), new Double[]{0.0d, FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE, 2.0d}); + assertTrue(record.getNullValueFields().isEmpty()); } // Test empty record --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
