This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9232bb5  Moving handling of NULL values from RecordReaders to 
NullValueTransfo… (#4399)
9232bb5 is described below

commit 9232bb5ad6962069724510277141e11ca7724e26
Author: Kishore Gopalakrishna <[email protected]>
AuthorDate: Fri Jul 19 11:38:27 2019 -0700

    Moving handling of NULL values from RecordReaders to NullValueTransfo… 
(#4399)
    
    * Moving handling of NULL values from RecordReaders to NullValueTransformer
    
    * Addressing review comments and fixing test cases
    
    * Removing commented line
---
 .../org/apache/pinot/common/utils/JsonUtils.java   |  6 +--
 .../realtime/HLRealtimeSegmentDataManager.java     |  4 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  4 +-
 .../pinot/core/data/readers/RecordReaderUtils.java | 14 ++----
 ...dTransformer.java => CompositeTransformer.java} | 20 ++++----
 .../recordtransformer/NullValueTransformer.java    | 53 ++++++++++++++++++++++
 ...ansformer.java => SanitizationTransformer.java} |  6 +--
 .../pinot/core/minion/BackfillDateTimeColumn.java  |  4 +-
 .../converter/RealtimeSegmentConverter.java        |  4 +-
 .../RecordReaderSegmentCreationDataSource.java     |  4 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  4 +-
 .../data/readers/RecordReaderSampleDataTest.java   |  8 ++--
 .../pinot/core/data/readers/RecordReaderTest.java  |  8 +++-
 .../recordtransformer/RecordTransformerTest.java   |  6 +--
 14 files changed, 100 insertions(+), 45 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
index 10abd41..7d97195 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
@@ -124,7 +124,7 @@ public class JsonUtils {
       if (jsonValue != null && !jsonValue.isNull()) {
         return extractSingleValue(jsonValue, fieldSpec.getDataType());
       } else {
-        return fieldSpec.getDefaultNullValue();
+        return null;
       }
     } else {
       if (jsonValue != null && !jsonValue.isNull()) {
@@ -137,13 +137,13 @@ public class JsonUtils {
             }
             return values;
           } else {
-            return new Object[]{fieldSpec.getDefaultNullValue()};
+            return null;
           }
         } else {
           return new Object[]{extractSingleValue(jsonValue, 
fieldSpec.getDataType())};
         }
       } else {
-        return new Object[]{fieldSpec.getDefaultNullValue()};
+        return null;
       }
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index d5b223a..35c13f5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
 import org.apache.pinot.core.data.GenericRow;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -106,7 +106,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     super();
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     this.schema = schema;
-    _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
+    _recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
     this.serverMetrics = serverMetrics;
     this.segmentName = realtimeSegmentZKMetadata.getSegmentName();
     this.tableNameWithType = tableConfig.getTableName();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index b5c9060..04cfec7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -54,7 +54,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.GenericRow;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
@@ -1153,7 +1153,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();
 
     // Create record transformer
-    _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
+    _recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
     makeStreamConsumer("Starting");
     makeStreamMetadataProvider("Starting");
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
index 25e6d44..40edaa9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
@@ -111,9 +111,7 @@ public class RecordReaderUtils {
    */
   public static Object convertSingleValue(FieldSpec fieldSpec, @Nullable 
Object value) {
     if (value == null) {
-      // Do not allow default value for time column
-      assert fieldSpec.getFieldType() != FieldSpec.FieldType.TIME;
-      return fieldSpec.getDefaultNullValue();
+      return null;
     }
     if (value instanceof GenericData.Record) {
       return convertSingleValue(fieldSpec, ((GenericData.Record) 
value).get(0));
@@ -160,14 +158,12 @@ public class RecordReaderUtils {
    */
   public static Object convertSingleValue(FieldSpec fieldSpec, @Nullable 
String stringValue) {
     if (stringValue == null) {
-      // Do not allow default value for time column
-      assert fieldSpec.getFieldType() != FieldSpec.FieldType.TIME;
-      return fieldSpec.getDefaultNullValue();
+      return null;
     }
     DataType dataType = fieldSpec.getDataType();
     // Treat empty string as null for data types other than STRING
     if (stringValue.isEmpty() && dataType != DataType.STRING) {
-      return fieldSpec.getDefaultNullValue();
+      return null;
     }
     switch (dataType) {
       case INT:
@@ -190,7 +186,7 @@ public class RecordReaderUtils {
    */
   public static Object convertMultiValue(FieldSpec fieldSpec, @Nullable 
Collection values) {
     if (values == null || values.isEmpty()) {
-      return new Object[]{fieldSpec.getDefaultNullValue()};
+      return null;
     } else {
       int numValues = values.size();
       Object[] array = new Object[numValues];
@@ -207,7 +203,7 @@ public class RecordReaderUtils {
    */
   public static Object convertMultiValue(FieldSpec fieldSpec, @Nullable 
String[] stringValues) {
     if (stringValues == null || stringValues.length == 0) {
-      return new Object[]{fieldSpec.getDefaultNullValue()};
+      return null;
     } else {
       int numValues = stringValues.length;
       Object[] array = new Object[numValues];
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
similarity index 70%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
index 5ee8e60..46cc2de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
@@ -27,9 +27,9 @@ import org.apache.pinot.core.data.GenericRow;
 
 
 /**
- * The {@code CompoundTransformer} class performs multiple transforms based on 
the inner {@link RecordTransformer}s.
+ * The {@code CompositeTransformer} class performs multiple transforms based 
on the inner {@link RecordTransformer}s.
  */
-public class CompoundTransformer implements RecordTransformer {
+public class CompositeTransformer implements RecordTransformer {
   private final List<RecordTransformer> _transformers;
 
   /**
@@ -41,25 +41,25 @@ public class CompoundTransformer implements 
RecordTransformer {
    *     column
    *   </li>
    *   <li>
-   *     We put {@link SanitationTransformer} after {@link 
DataTypeTransformer} so that before sanitation, all values
+   *     We put {@link SanitizationTransformer} after {@link 
DataTypeTransformer} so that before sanitation, all values
    *     follow the data types defined in the {@link Schema}.
    *   </li>
    * </ul>
    */
-  public static CompoundTransformer getDefaultTransformer(Schema schema) {
-    return new CompoundTransformer(Arrays
-        .asList(new TimeTransformer(schema), new 
ExpressionTransformer(schema), new DataTypeTransformer(schema),
-            new SanitationTransformer(schema)));
+  public static CompositeTransformer getDefaultTransformer(Schema schema) {
+    return new CompositeTransformer(Arrays
+        .asList(new NullValueTransformer(schema), new TimeTransformer(schema), 
new ExpressionTransformer(schema), new DataTypeTransformer(schema),
+            new SanitizationTransformer(schema)));
   }
 
   /**
    * Returns a pass through record transformer that does not transform the 
record.
    */
-  public static CompoundTransformer getPassThroughTransformer() {
-    return new CompoundTransformer(Collections.emptyList());
+  public static CompositeTransformer getPassThroughTransformer() {
+    return new CompositeTransformer(Collections.emptyList());
   }
 
-  public CompoundTransformer(List<RecordTransformer> transformers) {
+  public CompositeTransformer(List<RecordTransformer> transformers) {
     _transformers = transformers;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
new file mode 100644
index 0000000..c4a86c8
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.recordtransformer;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+
+
+public class NullValueTransformer implements RecordTransformer {
+
+  private final Collection<FieldSpec> _fieldSpecs;
+  private Schema _schema;
+
+  public NullValueTransformer(Schema schema) {
+    _schema = schema;
+    _fieldSpecs = _schema.getAllFieldSpecs();
+  }
+
+  @Override
+  public GenericRow transform(GenericRow row) {
+    for (FieldSpec fieldSpec : _fieldSpecs) {
+      String fieldName = fieldSpec.getName();
+      // Do not allow default value for time column
+      if (row.getValue(fieldName) == null && fieldSpec.getFieldType() != 
FieldSpec.FieldType.TIME) {
+        if (fieldSpec.isSingleValueField()) {
+          row.putField(fieldName, fieldSpec.getDefaultNullValue());
+        } else {
+          row.putField(fieldName, new 
Object[]{fieldSpec.getDefaultNullValue()});
+        }
+      }
+    }
+    return row;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
similarity index 92%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
index faa84a4..e77d8a2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
@@ -27,7 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
 
 
 /**
- * The {@code SanitationTransformer} class will sanitize the values to follow 
certain rules including:
+ * The {@code SanitizationTransformer} class will sanitize the values to 
follow certain rules including:
  * <ul>
  *   <li>No {@code null} characters in string values</li>
  *   <li>Values are within the length limit</li>
@@ -36,10 +36,10 @@ import org.apache.pinot.core.data.GenericRow;
  * <p>NOTE: should put this after the {@link DataTypeTransformer} so that all 
values follow the data types in
  * {@link FieldSpec}.
  */
-public class SanitationTransformer implements RecordTransformer {
+public class SanitizationTransformer implements RecordTransformer {
   private final Map<String, Integer> _stringColumnMaxLengthMap = new 
HashMap<>();
 
-  public SanitationTransformer(Schema schema) {
+  public SanitizationTransformer(Schema schema) {
     for (Map.Entry<String, FieldSpec> entry : 
schema.getFieldSpecMap().entrySet()) {
       FieldSpec fieldSpec = entry.getValue();
       if (fieldSpec.getDataType() == FieldSpec.DataType.STRING) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index 9ae4782..174826f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -33,7 +33,7 @@ import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.FileFormat;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.readers.RecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import 
org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -103,7 +103,7 @@ public class BackfillDateTimeColumn {
     LOGGER.info("Creating segment for {} with config {}", segmentName, 
config.toString());
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     driver.init(config, new 
RecordReaderSegmentCreationDataSource(wrapperReader),
-        CompoundTransformer.getPassThroughTransformer());
+        CompositeTransformer.getPassThroughTransformer());
     driver.build();
 
     return true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index a9c34a9..94fede8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.data.TimeFieldSpec;
 import org.apache.pinot.common.data.TimeGranularitySpec;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -138,7 +138,7 @@ public class RealtimeSegmentConverter {
     final SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     RealtimeSegmentSegmentCreationDataSource dataSource =
         new RealtimeSegmentSegmentCreationDataSource(realtimeSegmentImpl, 
reader, dataSchema);
-    driver.init(genConfig, dataSource, 
CompoundTransformer.getPassThroughTransformer());
+    driver.init(genConfig, dataSource, 
CompositeTransformer.getPassThroughTransformer());
     driver.build();
 
     if (segmentPartitionConfig != null && 
segmentPartitionConfig.getColumnPartitionMap() != null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
index b4e0b12..bfa1ecc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.segment.creator;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.RecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
 import 
org.apache.pinot.core.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl;
 import org.slf4j.Logger;
@@ -45,7 +45,7 @@ public class RecordReaderSegmentCreationDataSource implements 
SegmentCreationDat
   @Override
   public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig 
statsCollectorConfig) {
     try {
-      RecordTransformer recordTransformer = 
CompoundTransformer.getDefaultTransformer(statsCollectorConfig.getSchema());
+      RecordTransformer recordTransformer = 
CompositeTransformer.getDefaultTransformer(statsCollectorConfig.getSchema());
 
       SegmentPreIndexStatsCollector collector = new 
SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
       collector.init();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 248f16d..7d4cd9b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.data.readers.RecordReaderFactory;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -113,7 +113,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource 
dataSource) {
-    init(config, dataSource, 
CompoundTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema()));
+    init(config, dataSource, 
CompositeTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema()));
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource 
dataSource,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index 94c7af3..a1ca4dc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -68,6 +69,7 @@ public class RecordReaderSampleDataTest {
   @Test
   public void testRecordReaders()
       throws Exception {
+    CompositeTransformer defaultTransformer = 
CompositeTransformer.getDefaultTransformer(SCHEMA);
     try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA);
         CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA, null);
         JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA)) {
@@ -77,9 +79,9 @@ public class RecordReaderSampleDataTest {
         assertTrue(jsonRecordReader.hasNext());
         numRecords++;
 
-        GenericRow avroRecord = avroRecordReader.next();
-        GenericRow csvRecord = csvRecordReader.next();
-        GenericRow jsonRecord = jsonRecordReader.next();
+        GenericRow avroRecord = 
defaultTransformer.transform(avroRecordReader.next());
+        GenericRow csvRecord = 
defaultTransformer.transform(csvRecordReader.next());
+        GenericRow jsonRecord = 
defaultTransformer.transform(jsonRecordReader.next());
         assertEquals(avroRecord, csvRecord);
         assertEquals(avroRecord, jsonRecord);
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
index 54ff94a..1b35a66 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.readers;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.testng.Assert;
 
 
@@ -33,14 +34,17 @@ public abstract class RecordReaderTest {
 
   protected static void checkValue(RecordReader recordReader)
       throws Exception {
+    CompositeTransformer defaultTransformer = 
CompositeTransformer.getDefaultTransformer(SCHEMA);
     for (Object[] expectedRecord : RECORDS) {
       GenericRow actualRecord = recordReader.next();
+      GenericRow transformedRecord = 
defaultTransformer.transform(actualRecord);
+
       int numColumns = COLUMNS.length;
       for (int i = 0; i < numColumns; i++) {
         if (expectedRecord[i] != null) {
-          Assert.assertEquals(actualRecord.getValue(COLUMNS[i]), 
expectedRecord[i]);
+          Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), 
expectedRecord[i]);
         } else {
-          Assert.assertEquals(actualRecord.getValue(COLUMNS[i]), 
DEFAULT_VALUES[i]);
+          Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]), 
DEFAULT_VALUES[i]);
         }
       }
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
index 81283cf..d1f03ec 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
@@ -110,7 +110,7 @@ public class RecordTransformerTest {
 
   @Test
   public void testSanitationTransformer() {
-    RecordTransformer transformer = new SanitationTransformer(SCHEMA);
+    RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       record = transformer.transform(record);
@@ -122,7 +122,7 @@ public class RecordTransformerTest {
 
   @Test
   public void testDefaultTransformer() {
-    RecordTransformer transformer = 
CompoundTransformer.getDefaultTransformer(SCHEMA);
+    RecordTransformer transformer = 
CompositeTransformer.getDefaultTransformer(SCHEMA);
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       record = transformer.transform(record);
@@ -143,7 +143,7 @@ public class RecordTransformerTest {
 
   @Test
   public void testPassThroughTransformer() {
-    RecordTransformer transformer = 
CompoundTransformer.getPassThroughTransformer();
+    RecordTransformer transformer = 
CompositeTransformer.getPassThroughTransformer();
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       record = transformer.transform(record);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to