This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-row-based-schema-validator in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 98216c725bd1bec72db2cbab4ac2b12d7ff237bf Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Fri Sep 4 12:15:14 2020 -0700 Add row based schema validation code to detect schema mismatch --- .../apache/pinot/common/utils/PinotDataType.java | 9 ++ .../recordtransformer/DataTypeTransformer.java | 50 ++++++++-- .../pinot/core/segment/creator/SegmentCreator.java | 4 +- .../creator/impl/SegmentColumnarIndexCreator.java | 29 +++++- .../impl/SegmentIndexCreationDriverImpl.java | 3 +- .../recordtransformer/DataTypeTransformerTest.java | 70 ++++++------- .../pinot/query/executor/QueryExecutorTest.java | 15 ++- .../hadoop/job/mappers/SegmentCreationMapper.java | 22 ++-- .../hadoop/data/IngestionSchemaValidatorTest.java | 111 ++++++++++++++++----- .../data/test_sample_data_multi_value.avro | Bin 0 -> 12222227 bytes .../avro/AvroIngestionSchemaValidator.java | 53 +++++----- .../pinot/spi/data/IngestionSchemaValidator.java | 8 +- .../spi/data/RowBasedSchemaValidationResults.java | 64 ++++++++++++ ...Validator.java => SchemaValidationResults.java} | 38 +++---- .../apache/pinot/spi/data/readers/GenericRow.java | 18 ++++ 15 files changed, 358 insertions(+), 136 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java index 97c017e..c46a33e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java @@ -586,22 +586,31 @@ public enum PinotDataType { public PinotDataType getSingleValueType() { switch (this) { + case BYTE: case BYTE_ARRAY: return BYTE; + case CHARACTER: case CHARACTER_ARRAY: return CHARACTER; + case SHORT: case SHORT_ARRAY: return SHORT; + case INTEGER: case INTEGER_ARRAY: return INTEGER; + case LONG: case LONG_ARRAY: return LONG; + case FLOAT: case FLOAT_ARRAY: return FLOAT; + case DOUBLE: case DOUBLE_ARRAY: return DOUBLE; + case STRING: case STRING_ARRAY: return STRING; + case OBJECT: case OBJECT_ARRAY: return OBJECT; default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java index 4ab665c..a8bee3e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java @@ -24,8 +24,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.common.utils.PinotDataType; import org.apache.pinot.spi.data.FieldSpec; @@ -85,7 +87,7 @@ public class DataTypeTransformer implements RecordTransformer { continue; } PinotDataType dest = entry.getValue(); - value = standardize(column, value, dest.isSingleValue()); + value = standardize(record, column, value, dest.isSingleValue()); // NOTE: The standardized value could be null for empty Collection/Map/Object[]. if (value == null) { record.putValue(column, null); @@ -109,6 +111,9 @@ public class DataTypeTransformer implements RecordTransformer { } } if (source != dest) { + if (source.getSingleValueType() != dest.getSingleValueType()) { + putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column); + } value = dest.convert(value, source); } @@ -127,28 +132,39 @@ public class DataTypeTransformer implements RecordTransformer { */ @VisibleForTesting @Nullable - static Object standardize(String column, @Nullable Object value, boolean isSingleValue) { + static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) { + return standardize(record, column, value, isSingleValue, 1); + } + + static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) { if (value == null) { return null; } + // If it's single-value column and the value is Collection/Map/Object[], mark the key. if (value instanceof Collection) { - return standardizeCollection(column, (Collection) value, isSingleValue); + return standardizeCollection(record, column, (Collection) value, isSingleValue, level); } if (value instanceof Map) { - return standardizeCollection(column, ((Map) value).values(), isSingleValue); + // If it's a map structure, mark the key. + putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column); + Collection values = ((Map) value).values(); + return standardizeCollection(record, column, values, isSingleValue, level); } if (value instanceof Object[]) { + if (isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } Object[] values = (Object[]) value; int numValues = values.length; if (numValues == 0) { return null; } if (numValues == 1) { - return standardize(column, values[0], isSingleValue); + return standardize(record, column, values[0], isSingleValue, level + 1); } List<Object> standardizedValues = new ArrayList<>(numValues); for (Object singleValue : values) { - Object standardizedValue = standardize(column, singleValue, true); + Object standardizedValue = standardize(record, column, singleValue, true, level + 1); if (standardizedValue != null) { standardizedValues.add(standardizedValue); } @@ -164,20 +180,27 @@ public class DataTypeTransformer implements RecordTransformer { Arrays.toString(values), column); return standardizedValues.toArray(); } + // If it's multi-value column and the level is 1, mark the key. + if (!isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } return value; } - private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) { + private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) { + if (isSingleValue && level == 1) { + putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column); + } int numValues = collection.size(); if (numValues == 0) { return null; } if (numValues == 1) { - return standardize(column, collection.iterator().next(), isSingleValue); + return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1); } List<Object> standardizedValues = new ArrayList<>(numValues); for (Object singleValue : collection) { - Object standardizedValue = standardize(column, singleValue, true); + Object standardizedValue = standardize(record, column, singleValue, true, level + 1); if (standardizedValue != null) { standardizedValues.add(standardizedValue); } @@ -193,4 +216,13 @@ public class DataTypeTransformer implements RecordTransformer { .checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column); return standardizedValues.toArray(); } + + private static void putValueAsSetToKey(GenericRow record, String key, String value) { + Set<String> valueSet = (Set) record.getValue(key); + if (valueSet == null) { + valueSet = new HashSet<>(); + record.putValue(key, valueSet); + } + valueSet.add(value); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java index 2ba6246..adf6141 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.Map; import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.spi.data.IngestionSchemaValidator; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; @@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable { * @throws Exception */ void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo, - Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir) + Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir, + IngestionSchemaValidator ingestionSchemaValidator) throws Exception; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java index 4489dc8..b0ed11d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -57,6 +57,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.FieldSpec.FieldType; +import org.apache.pinot.spi.data.IngestionSchemaValidator; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.utils.TimeUtils; @@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { // TODO Refactor class name to match interface name private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class); private SegmentGeneratorConfig config; + private IngestionSchemaValidator _ingestionSchemaValidator; private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap; private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>(); private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>(); @@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { @Override public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo, - Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir) + Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir, + IngestionSchemaValidator ingestionSchemaValidator) throws Exception { docIdCounter = 0; config = segmentCreationSpec; this.indexCreationInfoMap = indexCreationInfoMap; + _ingestionSchemaValidator = ingestionSchemaValidator; // Check that the output directory does not exist Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir); @@ -304,6 +309,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { @Override public void indexRow(GenericRow row) { + validateRowBasedSchemas(row); for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) { String columnName = entry.getKey(); ForwardIndexCreator forwardIndexCreator = entry.getValue(); @@ -400,6 +406,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { nullValueVectorCreator.seal(); } writeMetadata(); + _ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults(); } private void writeMetadata() @@ -558,6 +565,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } + private void validateRowBasedSchemas(GenericRow row) { + if (_ingestionSchemaValidator == null) { + return; + } + RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults(); + + if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) { + Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns); + } + if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) { + Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns); + } + if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) { + Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY); + rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns); + } + } + /** * Helper method to check whether the given value is a valid property value. * <p>Value is invalid iff: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index cabdc47..e5a4b84 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -187,7 +187,8 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive try { // Initialize the index creation using the per-column statistics information - indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir); + indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir, + _ingestionSchemaValidator); // Build the index recordReader.rewind(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java index a60c460..be4ad8d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.spi.data.readers.GenericRow; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -39,17 +40,18 @@ public class DataTypeTransformerTest { /** * Tests for Map */ + GenericRow record = new GenericRow(); // Empty Map Map<String, Object> map = Collections.emptyMap(); - assertNull(DataTypeTransformer.standardize(COLUMN, map, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, map, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false)); // Map with single entry String expectedValue = "testValue"; map = Collections.singletonMap("testKey", expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with multiple entries Object[] expectedValues = new Object[]{"testValue1", "testValue2"}; @@ -58,12 +60,12 @@ public class DataTypeTransformerTest { map.put("testKey2", "testValue2"); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); /** * Tests for List @@ -71,24 +73,24 @@ public class DataTypeTransformerTest { // Empty List List<Object> list = Collections.emptyList(); - assertNull(DataTypeTransformer.standardize(COLUMN, list, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, list, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, list, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, list, false)); // List with single entry list = Collections.singletonList(expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValue); // List with multiple entries list = Arrays.asList(expectedValues); try { // Should fail because List with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, list, true); + DataTypeTransformer.standardize(record, COLUMN, list, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues); /** * Tests for Object[] @@ -96,24 +98,24 @@ public class DataTypeTransformerTest { // Empty Object[] Object[] values = new Object[0]; - assertNull(DataTypeTransformer.standardize(COLUMN, values, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, values, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, values, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, values, false)); // Object[] with single entry values = new Object[]{expectedValue}; - assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValue); // Object[] with multiple entries values = new Object[]{"testValue1", "testValue2"}; try { // Should fail because Object[] with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); /** * Tests for nested Map/List/Object[] @@ -121,32 +123,32 @@ public class DataTypeTransformerTest { // Map with empty List map = Collections.singletonMap("testKey", Collections.emptyList()); - assertNull(DataTypeTransformer.standardize(COLUMN, map, true)); - assertNull(DataTypeTransformer.standardize(COLUMN, map, false)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true)); + assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false)); // Map with single-entry List map = Collections.singletonMap("testKey", Collections.singletonList(expectedValue)); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with one empty Map and one single-entry Map map = new HashMap<>(); map.put("testKey1", Collections.emptyMap()); map.put("testKey2", Collections.singletonMap("testKey", expectedValue)); // Can be standardized into single value because empty Map should be ignored - assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue); - assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue); + assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue); // Map with multi-entries List map = Collections.singletonMap("testKey", Arrays.asList(expectedValues)); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); // Map with one empty Map, one single-entry List and one single-entry Object[] map = new HashMap<>(); @@ -155,12 +157,12 @@ public class DataTypeTransformerTest { map.put("testKey3", new Object[]{"testValue2"}); try { // Should fail because Map with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, map, true); + DataTypeTransformer.standardize(record, COLUMN, map, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues); // List with two single-entry Maps and one empty Map list = Arrays @@ -168,35 +170,35 @@ public class DataTypeTransformerTest { Collections.emptyMap()); try { // Should fail because List with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, list, true); + DataTypeTransformer.standardize(record, COLUMN, list, true); fail(); } catch (Exception e) { // Expected } - assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues); + assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues); // Object[] with two single-entry Maps values = new Object[]{Collections.singletonMap("testKey", "testValue1"), Collections.singletonMap("testKey", "testValue2")}; try { // Should fail because Object[] with multiple entries cannot be standardized as single value - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); // Object[] with one empty Object[], one multi-entries List of nested Map/List/Object[] values = new Object[]{new Object[0], Collections.singletonList( Collections.singletonMap("testKey", "testValue1")), Collections.singletonMap("testKey", Arrays.asList(new Object[]{"testValue2"}, Collections.emptyMap()))}; try { - DataTypeTransformer.standardize(COLUMN, values, true); + DataTypeTransformer.standardize(record, COLUMN, values, true); fail(); } catch (Exception e) { // Expected } - assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues); + assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java index c28a2d8..994f684 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java @@ -93,10 +93,17 @@ public class QueryExecutorTest { driver.init(config); driver.build(); IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator(); - Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult() + .isMismatchDetected()); + Assert.assertFalse( + ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult() + .isMismatchDetected()); + Assert.assertFalse( + ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult() + .isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult() + .isMismatchDetected()); _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap)); _segmentNames.add(driver.getSegmentName()); } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index fcc5653..aa3812b 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -52,8 +52,10 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableCustomConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.IngestionSchemaValidator; +import org.apache.pinot.spi.data.SchemaValidationResults; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.spi.utils.DataSizeUtils; @@ -387,23 +389,27 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab if (ingestionSchemaValidator == null) { return; } - if (ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()) { + SchemaValidationResults fileBasedSchemaValidationResults = ingestionSchemaValidator.getFileBasedSchemaValidationResults(); + if (fileBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected()) { _dataTypeMismatch++; - _logger.warn(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason()); + _logger.warn(fileBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason()); } - if (ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) { + if (fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) { _singleValueMultiValueFieldMismatch++; - ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason(); + fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason(); } - if (ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()) { + if (fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected()) { _multiValueStructureMismatch++; - ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason(); + fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason(); } - if (ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()) { + if (fileBasedSchemaValidationResults.getMissingPinotColumnResult().isMismatchDetected()) { _missingPinotColumn++; - ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason(); + fileBasedSchemaValidationResults.getMissingPinotColumnResult().getMismatchReason(); } + RowBasedSchemaValidationResults rowBasedSchemaValidationResults = ingestionSchemaValidator.getRowBasedSchemaValidationResults(); + //TODO add logic to detect. + if (isSchemaMismatch() && _failIfSchemaMismatch) { throw new RuntimeException("Schema mismatch detected. Forcing to fail the job. Please checking log message above."); } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java index fec3583..9170b79 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java @@ -20,17 +20,35 @@ package org.apache.pinot.hadoop.data; import com.google.common.base.Preconditions; import java.io.File; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.IngestionSchemaValidator; import org.apache.pinot.spi.data.SchemaValidatorFactory; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class IngestionSchemaValidatorTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest"); + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteQuietly(INDEX_DIR); + Assert.assertTrue(INDEX_DIR.mkdirs()); + } + @Test - public void testAvroIngestionSchemaValidator() + public void testAvroIngestionSchemaValidatorFileBasedSchemaValidation() throws Exception { String inputFilePath = new File( Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro")) @@ -47,10 +65,10 @@ public class IngestionSchemaValidatorTest { IngestionSchemaValidator ingestionSchemaValidator = SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath); Assert.assertNotNull(ingestionSchemaValidator); - Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected()); // Adding one extra column pinotSchema = new Schema.SchemaBuilder() @@ -64,12 +82,11 @@ public class IngestionSchemaValidatorTest { ingestionSchemaValidator = SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath); Assert.assertNotNull(ingestionSchemaValidator); - Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); - Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected()); + Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().getMismatchReason()); // Change the data type of column1 from LONG to STRING pinotSchema = new Schema.SchemaBuilder() @@ -81,12 +98,11 @@ public class IngestionSchemaValidatorTest { ingestionSchemaValidator = SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath); Assert.assertNotNull(ingestionSchemaValidator); - Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason()); - Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().getMismatchReason()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected()); // Change column2 from single-value column to multi-value column pinotSchema = new Schema.SchemaBuilder() @@ -98,14 +114,59 @@ public class IngestionSchemaValidatorTest { ingestionSchemaValidator = SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath); Assert.assertNotNull(ingestionSchemaValidator); - Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()); - Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); - Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); - Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()); - Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason()); - System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason()); - Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); + Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().getMismatchReason()); + Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected()); + } + + @Test + public void testAvroIngestionValidatorRowBasedSchemaValidation() + throws Exception { + String tableName = "testTable"; + File avroFile = new File( + Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro")) + .getFile()); + + // column 2 is of int type in the AVRO. + // column3 and column16 are both of array of map structure. + Schema pinotSchema = new Schema.SchemaBuilder() + .addSingleValueDimension("column1", FieldSpec.DataType.STRING) + .addSingleValueDimension("column2", FieldSpec.DataType.LONG) + .addSingleValueDimension("column3", FieldSpec.DataType.STRING) + .addMultiValueDimension("column16", FieldSpec.DataType.STRING) + .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build(); + + SegmentGeneratorConfig segmentGeneratorConfig = + new SegmentGeneratorConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build(), + pinotSchema); + segmentGeneratorConfig.setInputFilePath(avroFile.getAbsolutePath()); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath()); + segmentGeneratorConfig.setTableName(tableName); + + SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig); + driver.build(); + + RowBasedSchemaValidationResults rowBasedSchemaValidationResults = driver.getIngestionSchemaValidator().getRowBasedSchemaValidationResults(); + Assert.assertTrue(rowBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected()); + Assert.assertNotNull(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason()); + System.out.println(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason()); + + Assert.assertTrue(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()); + Assert.assertNotNull(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); + System.out.println(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason()); + + Assert.assertTrue(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected()); + Assert.assertNotNull(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason()); + System.out.println(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason()); + + } + @AfterClass + public void tearDown() { + FileUtils.deleteQuietly(INDEX_DIR); } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro new file mode 100644 index 0000000..4e4a4d8 Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java index d0ee84f..ee6706d 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java @@ -23,22 +23,23 @@ import java.io.IOException; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.RowBasedSchemaValidationResults; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.IngestionSchemaValidator; -import org.apache.pinot.spi.data.SchemaValidatorResult; +import org.apache.pinot.spi.data.SchemaValidationResults; /** * Schema validator to validate pinot schema and avro schema */ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { + private SchemaValidationResults _fileBasedSchemaValidationResults = new SchemaValidationResults(); + private RowBasedSchemaValidationResults _rowBasedSchemaValidationResults = new RowBasedSchemaValidationResults(); + private org.apache.avro.Schema _avroSchema; private Schema _pinotSchema; - private SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult(); - private SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult(); - private SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult(); - private SchemaValidatorResult _missingPinotColumn = new SchemaValidatorResult(); + public AvroIngestionSchemaValidator() { } @@ -48,7 +49,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { _pinotSchema = pinotSchema; _avroSchema = extractAvroSchemaFromFile(inputFilePath); - validateSchemas(); + validateFileBasedSchemas(); } @Override @@ -57,23 +58,13 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { } @Override - public SchemaValidatorResult getDataTypeMismatchResult() { - return _dataTypeMismatch; - } - - @Override - public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() { - return _singleValueMultiValueFieldMismatch; - } - - @Override - public SchemaValidatorResult getMultiValueStructureMismatchResult() { - return _multiValueStructureMismatch; + public SchemaValidationResults getFileBasedSchemaValidationResults() { + return _fileBasedSchemaValidationResults; } @Override - public SchemaValidatorResult getMissingPinotColumnResult() { - return _missingPinotColumn; + public RowBasedSchemaValidationResults getRowBasedSchemaValidationResults() { + return _rowBasedSchemaValidationResults; } private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) { @@ -87,12 +78,12 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { } } - private void validateSchemas() { + private void validateFileBasedSchemas() { for (String columnName : _pinotSchema.getPhysicalColumnNames()) { FieldSpec fieldSpec = _pinotSchema.getFieldSpecFor(columnName); org.apache.avro.Schema.Field avroColumnField = _avroSchema.getField(columnName); if (avroColumnField == null) { - _missingPinotColumn.addMismatchReason(String + _fileBasedSchemaValidationResults.getMissingPinotColumnResult().addMismatchReason(String .format("The Pinot column: (%s: %s) is missing in the %s schema of input data.", columnName, fieldSpec.getDataType().name(), getInputSchemaType())); continue; @@ -116,7 +107,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { } if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) { - _dataTypeMismatch.addMismatchReason(String + _fileBasedSchemaValidationResults.getDataTypeMismatchResult().addMismatchReason(String .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName, fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(), getInputSchemaType())); @@ -125,20 +116,22 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator { if (fieldSpec.isSingleValueField()) { if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) { // the column is a complex structure - _singleValueMultiValueFieldMismatch.addMismatchReason(String.format( - "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.", - columnName, avroColumnSchema.getName(), getInputSchemaType())); + _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String + .format( + "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.", + columnName, avroColumnSchema.getName(), getInputSchemaType())); } } else { if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) { // the column is a complex structure - _singleValueMultiValueFieldMismatch.addMismatchReason(String.format( - "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", - columnName, avroColumnSchema.getName(), getInputSchemaType())); + _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String + .format( + "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.", + columnName, avroColumnSchema.getName(), getInputSchemaType())); } if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) { // multi-value column should use array structure for now. - _multiValueStructureMismatch.addMismatchReason(String.format( + _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format( "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.", columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName())); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java index 045327a..67a3da9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java @@ -28,11 +28,7 @@ public interface IngestionSchemaValidator { String getInputSchemaType(); - SchemaValidatorResult getDataTypeMismatchResult(); + SchemaValidationResults getFileBasedSchemaValidationResults(); - SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult(); - - SchemaValidatorResult getMultiValueStructureMismatchResult(); - - SchemaValidatorResult getMissingPinotColumnResult(); + RowBasedSchemaValidationResults getRowBasedSchemaValidationResults(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java new file mode 100644 index 0000000..655275a --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.data; + +import java.util.HashSet; +import java.util.Set; + + +/** + * This is the extension class on top of {@code SchemaValidationResults} class, since row based schema validation will + * be called much more frequently than the file base schema validation. We collect all the mismatch columns into the hash + * set and then generate the mismatch message all at once. + */ +public class RowBasedSchemaValidationResults extends SchemaValidationResults { + + private Set<String> _dataTypeMismatchColumns = new HashSet<>(); + private Set<String> _singleValueMultiValueFieldMismatchColumns = new HashSet<>(); + private Set<String> _multiValueStructureMismatchColumns = new HashSet<>(); + + public void collectDataTypeMismatchColumns(Set<String> columns) { + _dataTypeMismatchColumns.addAll(columns); + } + + public void collectSingleValueMultiValueFieldMismatchColumns(Set<String> columns) { + _singleValueMultiValueFieldMismatchColumns.addAll(columns); + } + + public void collectMultiValueStructureMismatchColumns(Set<String> columns) { + _multiValueStructureMismatchColumns.addAll(columns); + } + + public void gatherRowBasedSchemaValidationResults() { + if (!_dataTypeMismatchColumns.isEmpty()) { + _dataTypeMismatch.addMismatchReason(String.format("Found data type mismatch from the following Pinot columns: %s", + _dataTypeMismatchColumns.toString())); + } + if (!_singleValueMultiValueFieldMismatchColumns.isEmpty()) { + _singleValueMultiValueFieldMismatch.addMismatchReason(String + .format("Found single-value multi-value field mismatch from the following Pinot columns: %s", + _singleValueMultiValueFieldMismatchColumns.toString())); + } + if (!_multiValueStructureMismatchColumns.isEmpty()) { + _multiValueStructureMismatch.addMismatchReason(String + .format("Found multi-value structure mismatch from the following Pinot columns: %s", + _multiValueStructureMismatchColumns.toString())); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java similarity index 51% copy from pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java copy to pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java index 045327a..2d793c0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java @@ -18,21 +18,25 @@ */ package org.apache.pinot.spi.data; - -/** - * Validator to validate the schema between Pinot schema and input raw data schema - */ -public interface IngestionSchemaValidator { - - void init(Schema pinotSchema, String inputFilePath); - - String getInputSchemaType(); - - SchemaValidatorResult getDataTypeMismatchResult(); - - SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult(); - - SchemaValidatorResult getMultiValueStructureMismatchResult(); - - SchemaValidatorResult getMissingPinotColumnResult(); +public class SchemaValidationResults { + SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult(); + SchemaValidatorResult _missingPinotColumnResult = new SchemaValidatorResult(); + + public SchemaValidatorResult getDataTypeMismatchResult() { + return _dataTypeMismatch; + } + + public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() { + return _singleValueMultiValueFieldMismatch; + } + + public SchemaValidatorResult getMultiValueStructureMismatchResult() { + return _multiValueStructureMismatch; + } + + public SchemaValidatorResult getMissingPinotColumnResult() { + return _missingPinotColumnResult; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 5c45d6b..fc3f9a3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -61,6 +61,24 @@ public class GenericRow { */ public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$"; + /** + * This key is used to identify whether there is data type mismatch so that it requires a data type conversion. + * E.g. the Pinot column is of int type, whereas the input column is of long type. + */ + public static final String DATA_TYPE_MISMATCH_KEY = "$DATA_TYPE_MISMATCH_KEY$"; + + /** + * This key is used to identify whether the input value is a map structure for multi-value column. + * This is necessary for us to identify whether there is any existing use case that is leveraging this way to fetch values. + */ + public static final String MULTI_VALUE_STRUCTURE_MISMATCH_KEY = "$MULTI_VALUE_STRUCTURE_MISMATCH_KEY$"; + + /** + * This key is used to identify whether there is a single-value multi-value mismatch. E.g. the Pinot column is single-value, + * whereas the input data is a Collection/Map/object[]. + */ + public static final String SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY = "$SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY$"; + private final Map<String, Object> _fieldToValueMap = new HashMap<>(); private final Set<String> _nullValueFields = new HashSet<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
