This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9232bb5 Moving handling of NULL values from RecordReaders to
NullValueTransfo… (#4399)
9232bb5 is described below
commit 9232bb5ad6962069724510277141e11ca7724e26
Author: Kishore Gopalakrishna <[email protected]>
AuthorDate: Fri Jul 19 11:38:27 2019 -0700
Moving handling of NULL values from RecordReaders to NullValueTransfo…
(#4399)
* Moving handling of NULL values from RecordReaders to NullValueTransformer
* Addressing review comments and fixing test cases
* Removing commented line
---
.../org/apache/pinot/common/utils/JsonUtils.java | 6 +--
.../realtime/HLRealtimeSegmentDataManager.java | 4 +-
.../realtime/LLRealtimeSegmentDataManager.java | 4 +-
.../pinot/core/data/readers/RecordReaderUtils.java | 14 ++----
...dTransformer.java => CompositeTransformer.java} | 20 ++++----
.../recordtransformer/NullValueTransformer.java | 53 ++++++++++++++++++++++
...ansformer.java => SanitizationTransformer.java} | 6 +--
.../pinot/core/minion/BackfillDateTimeColumn.java | 4 +-
.../converter/RealtimeSegmentConverter.java | 4 +-
.../RecordReaderSegmentCreationDataSource.java | 4 +-
.../impl/SegmentIndexCreationDriverImpl.java | 4 +-
.../data/readers/RecordReaderSampleDataTest.java | 8 ++--
.../pinot/core/data/readers/RecordReaderTest.java | 8 +++-
.../recordtransformer/RecordTransformerTest.java | 6 +--
14 files changed, 100 insertions(+), 45 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
index 10abd41..7d97195 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/JsonUtils.java
@@ -124,7 +124,7 @@ public class JsonUtils {
if (jsonValue != null && !jsonValue.isNull()) {
return extractSingleValue(jsonValue, fieldSpec.getDataType());
} else {
- return fieldSpec.getDefaultNullValue();
+ return null;
}
} else {
if (jsonValue != null && !jsonValue.isNull()) {
@@ -137,13 +137,13 @@ public class JsonUtils {
}
return values;
} else {
- return new Object[]{fieldSpec.getDefaultNullValue()};
+ return null;
}
} else {
return new Object[]{extractSingleValue(jsonValue,
fieldSpec.getDataType())};
}
} else {
- return new Object[]{fieldSpec.getDefaultNullValue()};
+ return null;
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index d5b223a..35c13f5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
import org.apache.pinot.core.data.GenericRow;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -106,7 +106,7 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
super();
_segmentVersion = indexLoadingConfig.getSegmentVersion();
this.schema = schema;
- _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
+ _recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
this.serverMetrics = serverMetrics;
this.segmentName = realtimeSegmentZKMetadata.getSegmentName();
this.tableNameWithType = tableConfig.getTableName();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index b5c9060..04cfec7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -54,7 +54,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.GenericRow;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
@@ -1153,7 +1153,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();
// Create record transformer
- _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
+ _recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
makeStreamConsumer("Starting");
makeStreamMetadataProvider("Starting");
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
index 25e6d44..40edaa9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderUtils.java
@@ -111,9 +111,7 @@ public class RecordReaderUtils {
*/
public static Object convertSingleValue(FieldSpec fieldSpec, @Nullable
Object value) {
if (value == null) {
- // Do not allow default value for time column
- assert fieldSpec.getFieldType() != FieldSpec.FieldType.TIME;
- return fieldSpec.getDefaultNullValue();
+ return null;
}
if (value instanceof GenericData.Record) {
return convertSingleValue(fieldSpec, ((GenericData.Record)
value).get(0));
@@ -160,14 +158,12 @@ public class RecordReaderUtils {
*/
public static Object convertSingleValue(FieldSpec fieldSpec, @Nullable
String stringValue) {
if (stringValue == null) {
- // Do not allow default value for time column
- assert fieldSpec.getFieldType() != FieldSpec.FieldType.TIME;
- return fieldSpec.getDefaultNullValue();
+ return null;
}
DataType dataType = fieldSpec.getDataType();
// Treat empty string as null for data types other than STRING
if (stringValue.isEmpty() && dataType != DataType.STRING) {
- return fieldSpec.getDefaultNullValue();
+ return null;
}
switch (dataType) {
case INT:
@@ -190,7 +186,7 @@ public class RecordReaderUtils {
*/
public static Object convertMultiValue(FieldSpec fieldSpec, @Nullable
Collection values) {
if (values == null || values.isEmpty()) {
- return new Object[]{fieldSpec.getDefaultNullValue()};
+ return null;
} else {
int numValues = values.size();
Object[] array = new Object[numValues];
@@ -207,7 +203,7 @@ public class RecordReaderUtils {
*/
public static Object convertMultiValue(FieldSpec fieldSpec, @Nullable
String[] stringValues) {
if (stringValues == null || stringValues.length == 0) {
- return new Object[]{fieldSpec.getDefaultNullValue()};
+ return null;
} else {
int numValues = stringValues.length;
Object[] array = new Object[numValues];
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
similarity index 70%
rename from
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
index 5ee8e60..46cc2de 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompoundTransformer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/CompositeTransformer.java
@@ -27,9 +27,9 @@ import org.apache.pinot.core.data.GenericRow;
/**
- * The {@code CompoundTransformer} class performs multiple transforms based on
the inner {@link RecordTransformer}s.
+ * The {@code CompositeTransformer} class performs multiple transforms based
on the inner {@link RecordTransformer}s.
*/
-public class CompoundTransformer implements RecordTransformer {
+public class CompositeTransformer implements RecordTransformer {
private final List<RecordTransformer> _transformers;
/**
@@ -41,25 +41,25 @@ public class CompoundTransformer implements
RecordTransformer {
* column
* </li>
* <li>
- * We put {@link SanitationTransformer} after {@link
DataTypeTransformer} so that before sanitation, all values
+ * We put {@link SanitizationTransformer} after {@link
DataTypeTransformer} so that before sanitation, all values
* follow the data types defined in the {@link Schema}.
* </li>
* </ul>
*/
- public static CompoundTransformer getDefaultTransformer(Schema schema) {
- return new CompoundTransformer(Arrays
- .asList(new TimeTransformer(schema), new
ExpressionTransformer(schema), new DataTypeTransformer(schema),
- new SanitationTransformer(schema)));
+ public static CompositeTransformer getDefaultTransformer(Schema schema) {
+ return new CompositeTransformer(Arrays
+ .asList(new NullValueTransformer(schema), new TimeTransformer(schema),
new ExpressionTransformer(schema), new DataTypeTransformer(schema),
+ new SanitizationTransformer(schema)));
}
/**
* Returns a pass through record transformer that does not transform the
record.
*/
- public static CompoundTransformer getPassThroughTransformer() {
- return new CompoundTransformer(Collections.emptyList());
+ public static CompositeTransformer getPassThroughTransformer() {
+ return new CompositeTransformer(Collections.emptyList());
}
- public CompoundTransformer(List<RecordTransformer> transformers) {
+ public CompositeTransformer(List<RecordTransformer> transformers) {
_transformers = transformers;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
new file mode 100644
index 0000000..c4a86c8
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/NullValueTransformer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.recordtransformer;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+
+
+public class NullValueTransformer implements RecordTransformer {
+
+ private final Collection<FieldSpec> _fieldSpecs;
+ private Schema _schema;
+
+ public NullValueTransformer(Schema schema) {
+ _schema = schema;
+ _fieldSpecs = _schema.getAllFieldSpecs();
+ }
+
+ @Override
+ public GenericRow transform(GenericRow row) {
+ for (FieldSpec fieldSpec : _fieldSpecs) {
+ String fieldName = fieldSpec.getName();
+ // Do not allow default value for time column
+ if (row.getValue(fieldName) == null && fieldSpec.getFieldType() !=
FieldSpec.FieldType.TIME) {
+ if (fieldSpec.isSingleValueField()) {
+ row.putField(fieldName, fieldSpec.getDefaultNullValue());
+ } else {
+ row.putField(fieldName, new
Object[]{fieldSpec.getDefaultNullValue()});
+ }
+ }
+ }
+ return row;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
similarity index 92%
rename from
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
index faa84a4..e77d8a2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitationTransformer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/SanitizationTransformer.java
@@ -27,7 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
/**
- * The {@code SanitationTransformer} class will sanitize the values to follow
certain rules including:
+ * The {@code SanitizationTransformer} class will sanitize the values to
follow certain rules including:
* <ul>
* <li>No {@code null} characters in string values</li>
* <li>Values are within the length limit</li>
@@ -36,10 +36,10 @@ import org.apache.pinot.core.data.GenericRow;
* <p>NOTE: should put this after the {@link DataTypeTransformer} so that all
values follow the data types in
* {@link FieldSpec}.
*/
-public class SanitationTransformer implements RecordTransformer {
+public class SanitizationTransformer implements RecordTransformer {
private final Map<String, Integer> _stringColumnMaxLengthMap = new
HashMap<>();
- public SanitationTransformer(Schema schema) {
+ public SanitizationTransformer(Schema schema) {
for (Map.Entry<String, FieldSpec> entry :
schema.getFieldSpecMap().entrySet()) {
FieldSpec fieldSpec = entry.getValue();
if (fieldSpec.getDataType() == FieldSpec.DataType.STRING) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index 9ae4782..174826f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -33,7 +33,7 @@ import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.FileFormat;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.data.readers.RecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import
org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -103,7 +103,7 @@ public class BackfillDateTimeColumn {
LOGGER.info("Creating segment for {} with config {}", segmentName,
config.toString());
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(config, new
RecordReaderSegmentCreationDataSource(wrapperReader),
- CompoundTransformer.getPassThroughTransformer());
+ CompositeTransformer.getPassThroughTransformer());
driver.build();
return true;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index a9c34a9..94fede8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.common.data.TimeGranularitySpec;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -138,7 +138,7 @@ public class RealtimeSegmentConverter {
final SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(realtimeSegmentImpl,
reader, dataSchema);
- driver.init(genConfig, dataSource,
CompoundTransformer.getPassThroughTransformer());
+ driver.init(genConfig, dataSource,
CompositeTransformer.getPassThroughTransformer());
driver.build();
if (segmentPartitionConfig != null &&
segmentPartitionConfig.getColumnPartitionMap() != null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
index b4e0b12..bfa1ecc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.segment.creator;
import org.apache.pinot.common.Utils;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.RecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import
org.apache.pinot.core.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl;
import org.slf4j.Logger;
@@ -45,7 +45,7 @@ public class RecordReaderSegmentCreationDataSource implements
SegmentCreationDat
@Override
public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig
statsCollectorConfig) {
try {
- RecordTransformer recordTransformer =
CompoundTransformer.getDefaultTransformer(statsCollectorConfig.getSchema());
+ RecordTransformer recordTransformer =
CompositeTransformer.getDefaultTransformer(statsCollectorConfig.getSchema());
SegmentPreIndexStatsCollector collector = new
SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
collector.init();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 248f16d..7d4cd9b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.data.StarTreeIndexSpec;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.data.readers.RecordReaderFactory;
-import org.apache.pinot.core.data.recordtransformer.CompoundTransformer;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -113,7 +113,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
}
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource
dataSource) {
- init(config, dataSource,
CompoundTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema()));
+ init(config, dataSource,
CompositeTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema()));
}
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource
dataSource,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index 94c7af3..a1ca4dc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -68,6 +69,7 @@ public class RecordReaderSampleDataTest {
@Test
public void testRecordReaders()
throws Exception {
+ CompositeTransformer defaultTransformer =
CompositeTransformer.getDefaultTransformer(SCHEMA);
try (AvroRecordReader avroRecordReader = new
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA);
CSVRecordReader csvRecordReader = new
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA, null);
JSONRecordReader jsonRecordReader = new
JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA)) {
@@ -77,9 +79,9 @@ public class RecordReaderSampleDataTest {
assertTrue(jsonRecordReader.hasNext());
numRecords++;
- GenericRow avroRecord = avroRecordReader.next();
- GenericRow csvRecord = csvRecordReader.next();
- GenericRow jsonRecord = jsonRecordReader.next();
+ GenericRow avroRecord =
defaultTransformer.transform(avroRecordReader.next());
+ GenericRow csvRecord =
defaultTransformer.transform(csvRecordReader.next());
+ GenericRow jsonRecord =
defaultTransformer.transform(jsonRecordReader.next());
assertEquals(avroRecord, csvRecord);
assertEquals(avroRecord, jsonRecord);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
index 54ff94a..1b35a66 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.readers;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.testng.Assert;
@@ -33,14 +34,17 @@ public abstract class RecordReaderTest {
protected static void checkValue(RecordReader recordReader)
throws Exception {
+ CompositeTransformer defaultTransformer =
CompositeTransformer.getDefaultTransformer(SCHEMA);
for (Object[] expectedRecord : RECORDS) {
GenericRow actualRecord = recordReader.next();
+ GenericRow transformedRecord =
defaultTransformer.transform(actualRecord);
+
int numColumns = COLUMNS.length;
for (int i = 0; i < numColumns; i++) {
if (expectedRecord[i] != null) {
- Assert.assertEquals(actualRecord.getValue(COLUMNS[i]),
expectedRecord[i]);
+ Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]),
expectedRecord[i]);
} else {
- Assert.assertEquals(actualRecord.getValue(COLUMNS[i]),
DEFAULT_VALUES[i]);
+ Assert.assertEquals(transformedRecord.getValue(COLUMNS[i]),
DEFAULT_VALUES[i]);
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
index 81283cf..d1f03ec 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/RecordTransformerTest.java
@@ -110,7 +110,7 @@ public class RecordTransformerTest {
@Test
public void testSanitationTransformer() {
- RecordTransformer transformer = new SanitationTransformer(SCHEMA);
+ RecordTransformer transformer = new SanitizationTransformer(SCHEMA);
GenericRow record = getRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
@@ -122,7 +122,7 @@ public class RecordTransformerTest {
@Test
public void testDefaultTransformer() {
- RecordTransformer transformer =
CompoundTransformer.getDefaultTransformer(SCHEMA);
+ RecordTransformer transformer =
CompositeTransformer.getDefaultTransformer(SCHEMA);
GenericRow record = getRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
@@ -143,7 +143,7 @@ public class RecordTransformerTest {
@Test
public void testPassThroughTransformer() {
- RecordTransformer transformer =
CompoundTransformer.getPassThroughTransformer();
+ RecordTransformer transformer =
CompositeTransformer.getPassThroughTransformer();
GenericRow record = getRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]