This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 78bf676175968251832f29712b37d09bc4b49c41 Author: Jon Vexler <[email protected]> AuthorDate: Tue Mar 5 11:37:09 2024 -0800 [HUDI-7413] Fix schema exception types and error messages thrown with schema exceptions (#10677) Co-authored-by: Jonathan Vexler <=> --- .../java/org/apache/hudi/table/HoodieTable.java | 5 +- .../apache/hudi/avro/AvroSchemaCompatibility.java | 48 +++- .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 162 ++++++++---- .../hudi/common/table/TableSchemaResolver.java | 4 +- .../exception/HoodieNullSchemaTypeException.java | 32 +++ .../hudi/exception/InvalidUnionTypeException.java | 33 +++ ...ption.java => MissingSchemaFieldException.java} | 20 +- .../SchemaBackwardsCompatibilityException.java | 45 ++++ .../exception/SchemaCompatibilityException.java | 4 +- .../convert/AvroInternalSchemaConverter.java | 31 ++- .../org/apache/hudi/avro/TestAvroSchemaUtils.java | 25 ++ .../hudi/common/table/TestTableSchemaResolver.java | 4 +- .../schema/utils/TestAvroSchemaEvolutionUtils.java | 35 +++ .../apache/hudi/sink/ITTestDataStreamWrite.java | 6 +- .../scala/org/apache/hudi/HoodieSchemaUtils.scala | 42 ++- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 15 -- .../org/apache/hudi/TestHoodieSchemaUtils.java | 286 +++++++++++++++++++++ .../apache/hudi/functional/TestCOWDataSource.scala | 50 +++- .../apache/hudi/utilities/streamer/StreamSync.java | 7 +- ...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 10 +- 20 files changed, 745 insertions(+), 119 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index d5244ac427c..ed4e088ebeb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -67,6 +67,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.hudi.hadoop.fs.ConsistencyGuard; import org.apache.hudi.hadoop.fs.ConsistencyGuard.FileVisibility; import org.apache.hudi.index.HoodieIndex; @@ -854,8 +855,10 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get()); AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames()); + } catch (SchemaCompatibilityException e) { + throw e; } catch (Exception e) { - throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); + throw new SchemaCompatibilityException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java index f25824dbd4a..8ed0830815e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -283,6 +284,35 @@ public class AvroSchemaCompatibility { return result; } + private static String getLocationName(final Deque<LocationInfo> locations, Type readerType) { + StringBuilder sb = new StringBuilder(); + Iterator<LocationInfo> locationInfoIterator = locations.iterator(); + boolean addDot = false; + while (locationInfoIterator.hasNext()) { + if (addDot) { + sb.append("."); + } else { + addDot = true; + } + LocationInfo next = locationInfoIterator.next(); + sb.append(next.name); + //we check the reader type if we are at the last location. This is because + //if the type is array/map, that means the problem is that the field type + //of the writer is not array/map. If the type is something else, the problem + //is between the array element/map value of the reader and writer schemas + if (next.type.equals(Type.MAP)) { + if (locationInfoIterator.hasNext() || !readerType.equals(Type.MAP)) { + sb.append(".value"); + } + } else if (next.type.equals(Type.ARRAY)) { + if (locationInfoIterator.hasNext() || !readerType.equals(Type.ARRAY)) { + sb.append(".element"); + } + } + } + return sb.toString(); + } + /** * Calculates the compatibility of a reader/writer schema pair. * @@ -335,7 +365,7 @@ public class AvroSchemaCompatibility { for (final Schema writerBranch : writer.getTypes()) { SchemaCompatibilityResult compatibility = getCompatibility(reader, writerBranch, locations); if (compatibility.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) { - String message = String.format("reader union lacking writer type: %s", writerBranch.getType()); + String message = String.format("reader union lacking writer type: %s for field: '%s'", writerBranch.getType(), getLocationName(locations, reader.getType())); result = result.mergedWith(SchemaCompatibilityResult.incompatible( SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations))); } @@ -407,7 +437,7 @@ public class AvroSchemaCompatibility { } // No branch in the reader union has been found compatible with the writer // schema: - String message = String.format("reader union lacking writer type: %s", writer.getType()); + String message = String.format("reader union lacking writer type: %s for field: '%s'", writer.getType(), getLocationName(locations, reader.getType())); return result.mergedWith(SchemaCompatibilityResult .incompatible(SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations))); } @@ -433,9 +463,10 @@ public class AvroSchemaCompatibility { // reader field must have a default value. if (defaultValueAccessor.getDefaultValue(readerField) == null) { // reader field has no default value + String message = String.format("Field '%s.%s' has no default value", getLocationName(locations, readerField.schema().getType()), readerField.name()); result = result.mergedWith( SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE, - reader, writer, readerField.name(), asList(locations))); + reader, writer, message, asList(locations))); } } else { locations.addLast(new LocationInfo(readerField.name(), readerField.schema().getType())); @@ -482,8 +513,9 @@ public class AvroSchemaCompatibility { final Set<String> symbols = new TreeSet<>(writer.getEnumSymbols()); symbols.removeAll(reader.getEnumSymbols()); if (!symbols.isEmpty()) { + String message = String.format("Field '%s' missing enum symbols: %s", getLocationName(locations, reader.getType()), symbols); result = SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.MISSING_ENUM_SYMBOLS, reader, - writer, symbols.toString(), asList(locations)); + writer, message, asList(locations)); } return result; } @@ -494,7 +526,7 @@ public class AvroSchemaCompatibility { int actual = reader.getFixedSize(); int expected = writer.getFixedSize(); if (actual != expected) { - String message = String.format("expected: %d, found: %d", expected, actual); + String message = String.format("Fixed size field '%s' expected: %d, found: %d", getLocationName(locations, reader.getType()), expected, actual); result = SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.FIXED_SIZE_MISMATCH, reader, writer, message, asList(locations)); } @@ -511,7 +543,7 @@ public class AvroSchemaCompatibility { boolean shouldCheckNames = checkNaming && (locations.size() == 1 || locations.peekLast().type == Type.UNION); SchemaCompatibilityResult result = SchemaCompatibilityResult.compatible(); if (shouldCheckNames && !Objects.equals(reader.getFullName(), writer.getFullName())) { - String message = String.format("expected: %s", writer.getFullName()); + String message = String.format("Reader schema name: '%s' is not compatible with writer schema name: '%s'", reader.getFullName(), writer.getFullName()); result = SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.NAME_MISMATCH, reader, writer, message, asList(locations)); } @@ -520,8 +552,8 @@ public class AvroSchemaCompatibility { private SchemaCompatibilityResult typeMismatch(final Schema reader, final Schema writer, final Deque<LocationInfo> locations) { - String message = String.format("reader type: %s not compatible with writer type: %s", reader.getType(), - writer.getType()); + String message = String.format("reader type '%s' not compatible with writer type '%s' for field '%s'", reader.getType(), + writer.getType(), getLocationName(locations, reader.getType())); return SchemaCompatibilityResult.incompatible(SchemaIncompatibilityType.TYPE_MISMATCH, reader, writer, message, asList(locations)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 5ec466cca3d..6d546263047 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -18,13 +18,19 @@ package org.apache.hudi.avro; +import org.apache.hudi.exception.MissingSchemaFieldException; +import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.exception.InvalidUnionTypeException; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Objects; import java.util.Set; @@ -90,20 +96,20 @@ public class AvroSchemaUtils { * @return true if prev schema is a projection of new schema. */ public static boolean canProject(Schema prevSchema, Schema newSchema) { - return canProject(prevSchema, newSchema, Collections.emptySet()); + return findMissingFields(prevSchema, newSchema, Collections.emptySet()).isEmpty(); } /** - * Check that each field in the prevSchema can be populated in the newSchema except specified columns + * Check that each top level field in the prevSchema can be populated in the newSchema except specified columns * @param prevSchema prev schema. * @param newSchema new schema - * @return true if prev schema is a projection of new schema. + * @return List of fields that should be in the new schema */ - public static boolean canProject(Schema prevSchema, Schema newSchema, Set<String> exceptCols) { + private static List<Schema.Field> findMissingFields(Schema prevSchema, Schema newSchema, Set<String> exceptCols) { return prevSchema.getFields().stream() .filter(f -> !exceptCols.contains(f.name())) - .map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField)) - .noneMatch(Objects::isNull); + .filter(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField) == null) + .collect(Collectors.toList()); } /** @@ -119,31 +125,6 @@ public class AvroSchemaUtils { return "hoodie." + sanitizedTableName + "." + sanitizedTableName + "_record"; } - /** - * Validate whether the {@code targetSchema} is a valid evolution of {@code sourceSchema}. - * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type promotion in the - * opposite direction - */ - public static boolean isValidEvolutionOf(Schema sourceSchema, Schema targetSchema) { - return (sourceSchema.getType() == Schema.Type.NULL) || isProjectionOfInternal(sourceSchema, targetSchema, - AvroSchemaUtils::isAtomicSchemasCompatibleEvolution); - } - - /** - * Establishes whether {@code newReaderSchema} is compatible w/ {@code prevWriterSchema}, as - * defined by Avro's {@link AvroSchemaCompatibility}. - * {@code newReaderSchema} is considered compatible to {@code prevWriterSchema}, iff data written using {@code prevWriterSchema} - * could be read by {@code newReaderSchema} - * @param newReaderSchema new reader schema instance. - * @param prevWriterSchema prev writer schema instance. - * @return true if its compatible. else false. - */ - private static boolean isAtomicSchemasCompatibleEvolution(Schema newReaderSchema, Schema prevWriterSchema) { - // NOTE: Checking for compatibility of atomic types, we should ignore their - // corresponding fully-qualified names (as irrelevant) - return isSchemaCompatible(prevWriterSchema, newReaderSchema, false, true); - } - /** * Validate whether the {@code targetSchema} is a "compatible" projection of {@code sourceSchema}. * Only difference of this method from {@link #isStrictProjectionOf(Schema, Schema)} is @@ -352,25 +333,118 @@ public class AvroSchemaUtils { boolean allowProjection, Set<String> dropPartitionColNames) throws SchemaCompatibilityException { - String errorMessage = null; - - if (!allowProjection && !canProject(tableSchema, writerSchema, dropPartitionColNames)) { - errorMessage = "Column dropping is not allowed"; + if (!allowProjection) { + List<Schema.Field> missingFields = findMissingFields(tableSchema, writerSchema, dropPartitionColNames); + if (!missingFields.isEmpty()) { + throw new MissingSchemaFieldException(missingFields.stream().map(Schema.Field::name).collect(Collectors.toList()), writerSchema, tableSchema); + } } // TODO(HUDI-4772) re-enable validations in case partition columns // being dropped from the data-file after fixing the write schema - if (dropPartitionColNames.isEmpty() && shouldValidate && !isSchemaCompatible(tableSchema, writerSchema)) { - errorMessage = "Failed schema compatibility check"; + if (dropPartitionColNames.isEmpty() && shouldValidate) { + AvroSchemaCompatibility.SchemaPairCompatibility result = + AvroSchemaCompatibility.checkReaderWriterCompatibility(writerSchema, tableSchema, true); + if (result.getType() != AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) { + throw new SchemaBackwardsCompatibilityException(result, writerSchema, tableSchema); + } } + } - if (errorMessage != null) { - String errorDetails = String.format( - "%s\nwriterSchema: %s\ntableSchema: %s", - errorMessage, - writerSchema, - tableSchema); - throw new SchemaCompatibilityException(errorDetails); + /** + * Validate whether the {@code incomingSchema} is a valid evolution of {@code tableSchema}. + * + * @param incomingSchema schema of the incoming dataset + * @param tableSchema latest table schema + */ + public static void checkValidEvolution(Schema incomingSchema, Schema tableSchema) { + if (incomingSchema.getType() == Schema.Type.NULL) { + return; } + + //not really needed for `hoodie.write.set.null.for.missing.columns` but good to check anyway + List<String> missingFields = new ArrayList<>(); + findAnyMissingFields(incomingSchema, tableSchema, new ArrayDeque<>(), missingFields); + if (!missingFields.isEmpty()) { + throw new MissingSchemaFieldException(missingFields, incomingSchema, tableSchema); + } + + //make sure that the table schema can be read using the incoming schema + AvroSchemaCompatibility.SchemaPairCompatibility result = + AvroSchemaCompatibility.checkReaderWriterCompatibility(incomingSchema, tableSchema, false); + if (result.getType() != AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) { + throw new SchemaBackwardsCompatibilityException(result, incomingSchema, tableSchema); + } + } + + /** + * Find all fields in the latest table schema that are not in + * the incoming schema. + */ + private static void findAnyMissingFields(Schema incomingSchema, + Schema latestTableSchema, + Deque<String> visited, + List<String> missingFields) { + findAnyMissingFieldsRec(incomingSchema, latestTableSchema, visited, + missingFields, incomingSchema, latestTableSchema); + } + + /** + * We want to pass the full schemas so that the error message has the entire schema to print from + */ + private static void findAnyMissingFieldsRec(Schema incomingSchema, + Schema latestTableSchema, + Deque<String> visited, + List<String> missingFields, + Schema fullIncomingSchema, + Schema fullTableSchema) { + if (incomingSchema.getType() == latestTableSchema.getType()) { + if (incomingSchema.getType() == Schema.Type.RECORD) { + visited.addLast(latestTableSchema.getName()); + for (Schema.Field targetField : latestTableSchema.getFields()) { + visited.addLast(targetField.name()); + Schema.Field sourceField = incomingSchema.getField(targetField.name()); + if (sourceField == null) { + missingFields.add(String.join(".", visited)); + } else { + findAnyMissingFieldsRec(sourceField.schema(), targetField.schema(), visited, + missingFields, fullIncomingSchema, fullTableSchema); + } + visited.removeLast(); + } + visited.removeLast(); + } else if (incomingSchema.getType() == Schema.Type.ARRAY) { + visited.addLast("element"); + findAnyMissingFieldsRec(incomingSchema.getElementType(), latestTableSchema.getElementType(), + visited, missingFields, fullIncomingSchema, fullTableSchema); + visited.removeLast(); + } else if (incomingSchema.getType() == Schema.Type.MAP) { + visited.addLast("value"); + findAnyMissingFieldsRec(incomingSchema.getValueType(), latestTableSchema.getValueType(), + visited, missingFields, fullIncomingSchema, fullTableSchema); + visited.removeLast(); + } else if (incomingSchema.getType() == Schema.Type.UNION) { + List<Schema> incomingNestedSchemas = incomingSchema.getTypes(); + List<Schema> latestTableNestedSchemas = latestTableSchema.getTypes(); + if (incomingNestedSchemas.size() != latestTableNestedSchemas.size()) { + throw new InvalidUnionTypeException(createSchemaErrorString( + String.format("Incoming batch field '%s' has union with %d types, while the table schema has %d types", + String.join(".", visited), incomingNestedSchemas.size(), latestTableNestedSchemas.size()), fullIncomingSchema, fullTableSchema)); + } + if (incomingNestedSchemas.size() > 2) { + throw new InvalidUnionTypeException(createSchemaErrorString( + String.format("Union for incoming batch field '%s' should not have more than 2 types but has %d", + String.join(".", visited), incomingNestedSchemas.size()), fullIncomingSchema, fullTableSchema)); + } + for (int i = 0; i < incomingNestedSchemas.size(); ++i) { + findAnyMissingFieldsRec(incomingNestedSchemas.get(i), latestTableNestedSchemas.get(i), visited, + missingFields, fullIncomingSchema, fullTableSchema); + } + } + } + } + + public static String createSchemaErrorString(String errorMessage, Schema writerSchema, Schema tableSchema) { + return String.format("%s\nwriterSchema: %s\ntableSchema: %s", errorMessage, writerSchema, tableSchema); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 5291c725218..f37dd4e7540 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -37,8 +37,8 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; @@ -567,7 +567,7 @@ public class TableSchemaResolver { boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf)); boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf)); if (hasPartitionColNotInSchema && hasPartitionColInSchema) { - throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema"); + throw new HoodieSchemaException("Partition columns could not be partially contained w/in the data schema"); } if (hasPartitionColNotInSchema) { diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java new file mode 100644 index 00000000000..ff4abadcde9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieNullSchemaTypeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +import org.apache.hudi.internal.schema.HoodieSchemaException; + +/** + * Thrown if a schema is null or empty. Or if a field has type null + * (null is ok if it is in a union with 1 (one) other type) + */ +public class HoodieNullSchemaTypeException extends HoodieSchemaException { + public HoodieNullSchemaTypeException(String message) { + super(message); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java new file mode 100644 index 00000000000..370ad9438cc --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/InvalidUnionTypeException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +/** + * Thrown when a field is a union and at least one of the following is true: + * <ul> + * <li>the incoming union and the latest table union have differing numbers of types</li> + * <li>the incoming union has more than two types</li> + * </ul> + */ +public class InvalidUnionTypeException extends SchemaCompatibilityException { + public InvalidUnionTypeException(String message) { + super(message); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java similarity index 51% rename from hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java rename to hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java index a739af67909..4727ff814f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/MissingSchemaFieldException.java @@ -18,16 +18,24 @@ package org.apache.hudi.exception; +import org.apache.hudi.avro.AvroSchemaUtils; + +import org.apache.avro.Schema; + +import java.util.List; + /** - * Exception for incompatible schema. + * Thrown when the schema of the incoming data is missing fields that are in the table schema. */ -public class HoodieIncompatibleSchemaException extends RuntimeException { +public class MissingSchemaFieldException extends SchemaCompatibilityException { - public HoodieIncompatibleSchemaException(String msg, Throwable e) { - super(msg, e); + public MissingSchemaFieldException(List<String> missingFields, Schema writerSchema, Schema tableSchema) { + super(constructExceptionMessage(missingFields, writerSchema, tableSchema)); } - public HoodieIncompatibleSchemaException(String msg) { - super(msg); + private static String constructExceptionMessage(List<String> missingFields, Schema writerSchema, Schema tableSchema) { + return AvroSchemaUtils.createSchemaErrorString( + "Schema validation failed due to missing field. Fields missing from incoming schema: {" + + String.join(", ", missingFields) + "}", writerSchema, tableSchema); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java new file mode 100644 index 00000000000..c38d13c9e29 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaBackwardsCompatibilityException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +import org.apache.hudi.avro.AvroSchemaCompatibility; +import org.apache.hudi.avro.AvroSchemaUtils; + +import org.apache.avro.Schema; + +import java.util.stream.Collectors; + +/** + * Thrown when there is a backwards compatibility issue with the incoming schema. + * i.e. when the incoming schema cannot be used to read older data files + */ +public class SchemaBackwardsCompatibilityException extends SchemaCompatibilityException { + + public SchemaBackwardsCompatibilityException(AvroSchemaCompatibility.SchemaPairCompatibility compatibility, Schema writerSchema, Schema tableSchema) { + super(constructExceptionMessage(compatibility, writerSchema, tableSchema)); + } + + private static String constructExceptionMessage(AvroSchemaCompatibility.SchemaPairCompatibility compatibility, Schema writerSchema, Schema tableSchema) { + return AvroSchemaUtils.createSchemaErrorString("Schema validation backwards compatibility check failed with the following issues: {" + + compatibility.getResult().getIncompatibilities().stream() + .map(incompatibility -> incompatibility.getType().name() + ": " + incompatibility.getMessage()) + .collect(Collectors.joining(", ")) + "}", writerSchema, tableSchema); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java index 478ec0d4269..92d2f6744c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/SchemaCompatibilityException.java @@ -18,10 +18,12 @@ package org.apache.hudi.exception; +import org.apache.hudi.internal.schema.HoodieSchemaException; + /** * An exception thrown when schema has compatibility problems. */ -public class SchemaCompatibilityException extends HoodieException { +public class SchemaCompatibilityException extends HoodieSchemaException { public SchemaCompatibilityException(String message) { super(message); diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java index f80eb91522c..54f9cb65ba8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java @@ -19,6 +19,7 @@ package org.apache.hudi.internal.schema.convert; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieNullSchemaTypeException; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Type; @@ -32,6 +33,7 @@ import org.apache.avro.Schema; import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -79,7 +81,7 @@ public class AvroInternalSchemaConverter { * but for the metadata table HoodieMetadata.avsc uses a trick where we have a bunch of * different types wrapped in record for col stats. * - * @param Schema avro schema. + * @param schema avro schema. * @return an avro Schema where null is the first. */ public static Schema fixNullOrdering(Schema schema) { @@ -156,6 +158,29 @@ public class AvroInternalSchemaConverter { return visitAvroSchemaToBuildType(schema, visited, true, nextId); } + private static void checkNullType(Type fieldType, String fieldName, Deque<String> visited) { + if (fieldType == null) { + StringBuilder sb = new StringBuilder(); + sb.append("Field '"); + Iterator<String> visitedIterator = visited.descendingIterator(); + while (visitedIterator.hasNext()) { + sb.append(visitedIterator.next()); + sb.append("."); + } + sb.append(fieldName); + sb.append("' has type null"); + throw new HoodieNullSchemaTypeException(sb.toString()); + } else if (fieldType.typeId() == Type.TypeID.ARRAY) { + visited.push(fieldName); + checkNullType(((Types.ArrayType) fieldType).elementType(), "element", visited); + visited.pop(); + } else if (fieldType.typeId() == Type.TypeID.MAP) { + visited.push(fieldName); + checkNullType(((Types.MapType) fieldType).valueType(), "value", visited); + visited.pop(); + } + } + /** * Converts an avro schema into hudi type. * @@ -182,7 +207,9 @@ public class AvroInternalSchemaConverter { } nextId.set(nextAssignId + fields.size()); fields.stream().forEach(field -> { - fieldTypes.add(visitAvroSchemaToBuildType(field.schema(), visited, false, nextId)); + Type fieldType = visitAvroSchemaToBuildType(field.schema(), visited, false, nextId); + checkNullType(fieldType, field.name(), visited); + fieldTypes.add(fieldType); }); visited.pop(); List<Types.Field> internalFields = new ArrayList<>(fields.size()); diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java index c05683e605c..ea2301ce080 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.avro; +import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.avro.Schema; @@ -229,4 +230,28 @@ public class TestAvroSchemaUtils { public void testIsCompatiblePartitionDropCols(boolean shouldValidate) { AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, SHORT_SCHEMA, shouldValidate, false, Collections.singleton("c")); } + + private static final Schema BROKEN_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"broken\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"a\",\n" + + " \"type\" : [ \"null\", \"int\" ],\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"b\",\n" + + " \"type\" : [ \"null\", \"int\" ],\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"c\",\n" + + " \"type\" : [ \"null\", \"boolean\" ],\n" + + " \"default\" : null\n" + + " } ]\n" + + "}"); + + @Test + public void testBrokenSchema() { + assertThrows(SchemaBackwardsCompatibilityException.class, + () -> AvroSchemaUtils.checkSchemaCompatible(FULL_SCHEMA, BROKEN_SCHEMA, true, false, Collections.emptySet())); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index 3ac42b9d3b7..b7f0ba8eba7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -21,7 +21,7 @@ package org.apache.hudi.common.table; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieIncompatibleSchemaException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.avro.Schema; import org.junit.jupiter.api.Test; @@ -61,7 +61,7 @@ public class TestTableSchemaResolver { String[] pts4 = {"user_partition", "partition_path"}; try { TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3)); - } catch (HoodieIncompatibleSchemaException e) { + } catch (HoodieSchemaException e) { assertTrue(e.getMessage().contains("Partial partition fields are still in the schema")); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index 0be0a5f89c5..4027bd28178 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.internal.schema.utils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.exception.HoodieNullSchemaTypeException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.InternalSchemaBuilder; import org.apache.hudi.internal.schema.Type; @@ -46,6 +47,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * Tests {@link AvroSchemaEvolutionUtils}. */ @@ -184,6 +188,37 @@ public class TestAvroSchemaEvolutionUtils { Assertions.assertEquals(schema, AvroInternalSchemaConverter.convert(internalSchema, "newTableName")); } + @Test + public void testNullFieldType() { + Schema schema = create("t1", + new Schema.Field("nullField", Schema.create(Schema.Type.NULL), null, JsonProperties.NULL_VALUE)); + Throwable t = assertThrows(HoodieNullSchemaTypeException.class, + () -> AvroInternalSchemaConverter.convert(schema)); + assertTrue(t.getMessage().contains("'t1.nullField'")); + + Schema schemaArray = create("t2", + new Schema.Field("nullArray", Schema.createArray(Schema.create(Schema.Type.NULL)), null, null)); + t = assertThrows(HoodieNullSchemaTypeException.class, + () -> AvroInternalSchemaConverter.convert(schemaArray)); + assertTrue(t.getMessage().contains("'t2.nullArray.element'")); + + Schema schemaMap = create("t3", + new Schema.Field("nullMap", Schema.createMap(Schema.create(Schema.Type.NULL)), null, null)); + t = assertThrows(HoodieNullSchemaTypeException.class, + () -> AvroInternalSchemaConverter.convert(schemaMap)); + assertTrue(t.getMessage().contains("'t3.nullMap.value'")); + + + Schema schemaComplex = create("t4", + new Schema.Field("complexField", Schema.createMap( + create("nestedStruct", + new Schema.Field("nestedArray", Schema.createArray(Schema.createMap(Schema.create(Schema.Type.NULL))), + null, null))), null, null)); + t = assertThrows(HoodieNullSchemaTypeException.class, + () -> AvroInternalSchemaConverter.convert(schemaComplex)); + assertTrue(t.getMessage().contains("'t4.nestedStruct.nestedArray.element.value'")); + } + @Test public void testRefreshNewId() { Types.RecordType record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()), diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index fea986885f8..47c613ec784 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsInference; -import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.exception.MissingSchemaFieldException; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; @@ -557,13 +557,13 @@ public class ITTestDataStreamWrite extends TestLogger { } catch (JobExecutionException e) { Throwable actualException = e; while (actualException != null) { - if (actualException.getClass() == SchemaCompatibilityException.class) { + if (actualException.getClass() == MissingSchemaFieldException.class) { // test is passed return; } actualException = actualException.getCause(); } } - throw new AssertionError(String.format("Excepted exception %s is not found", SchemaCompatibilityException.class)); + throw new AssertionError(String.format("Excepted exception %s is not found", MissingSchemaFieldException.class)); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala index 0b42dc75b54..cfc43453e9c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala @@ -21,10 +21,10 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_SCHEMA, SQL_MERGE_INTO_WRITES} -import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible, isValidEvolutionOf} +import org.apache.hudi.avro.AvroSchemaUtils.{checkSchemaCompatible, checkValidEvolution, isCompatibleProjectionOf, isSchemaCompatible} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields -import org.apache.hudi.common.config.HoodieConfig +import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig @@ -78,7 +78,8 @@ object HoodieSchemaUtils { opts: Map[String, String]): Schema = { val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(), DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean - val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean + val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), + DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean @@ -167,34 +168,29 @@ object HoodieSchemaUtils { } else { canonicalizedSourceSchema } - if (isValidEvolutionOf(reconciledSchema, latestTableSchema)) { - reconciledSchema - } else { - log.error( - s"""Incoming batch schema is not compatible with the table's one. - |Incoming schema ${sourceSchema.toString(true)} - |Incoming schema (canonicalized) ${reconciledSchema.toString(true)} - |Table's schema ${latestTableSchema.toString(true)} - |""".stripMargin) - throw new SchemaCompatibilityException("Incoming batch schema is not compatible with the table's one") - } + checkValidEvolution(reconciledSchema, latestTableSchema) + reconciledSchema } - } else if (isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) { - canonicalizedSourceSchema } else { - log.error( - s"""Incoming batch schema is not compatible with the table's one. - |Incoming schema ${sourceSchema.toString(true)} - |Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)} - |Table's schema ${latestTableSchema.toString(true)} - |""".stripMargin) - throw new SchemaCompatibilityException("Incoming batch schema is not compatible with the table's one") + checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true, + allowAutoEvolutionColumnDrop, java.util.Collections.emptySet()) + canonicalizedSourceSchema } } } } } + def deduceWriterSchema(sourceSchema: Schema, + latestTableSchemaOpt: org.apache.hudi.common.util.Option[Schema], + internalSchemaOpt: org.apache.hudi.common.util.Option[InternalSchema], + props: TypedProperties): Schema = { + deduceWriterSchema(sourceSchema, + HoodieConversionUtils.toScalaOption(latestTableSchemaOpt), + HoodieConversionUtils.toScalaOption(internalSchemaOpt), + HoodieConversionUtils.fromProperties(props)) + } + /** * Canonicalizes [[sourceSchema]] by reconciling it w/ [[latestTableSchema]] in following * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index eea93e426fb..dbeb9714333 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -133,21 +133,6 @@ object HoodieSparkSqlWriter { new HoodieSparkSqlWriterInternal().bootstrap(sqlContext, mode, optParams, df, hoodieTableConfigOpt, streamingWritesParamsOpt, hoodieWriteClient) } - /** - * Deduces writer's schema based on - * <ul> - * <li>Source's schema</li> - * <li>Target table's schema (including Hudi's [[InternalSchema]] representation)</li> - * </ul> - */ - def deduceWriterSchema(sourceSchema: Schema, - latestTableSchemaOpt: Option[Schema], - internalSchemaOpt: Option[InternalSchema], - props: TypedProperties): Schema = { - HoodieSchemaUtils.deduceWriterSchema(sourceSchema, latestTableSchemaOpt, - internalSchemaOpt, HoodieConversionUtils.fromProperties(props)) - } - def cleanup(): Unit = { Metrics.shutdownAllMetrics() } diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java new file mode 100644 index 00000000000..b10d0cfa992 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieNullSchemaTypeException; +import org.apache.hudi.exception.MissingSchemaFieldException; +import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; + +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieSchemaUtils { + + @Test + void testSchemaWithNullField() { + Schema withNullfield = createRecord("nullRecord", createPrimitiveField("nullField", Schema.Type.NULL)); + assertThrows(HoodieNullSchemaTypeException.class, + () -> deduceWriterSchema(withNullfield, null)); + } + + @Test + void testSimplePromotionWithComplexFields() { + Schema start = createRecord("simple", createPrimitiveField("f", Schema.Type.INT)); + Schema end = createRecord("simple", createPrimitiveField("f", Schema.Type.LONG)); + assertEquals(end, deduceWriterSchema(end, start)); + + start = createRecord("nested", createNestedField("f", Schema.Type.INT)); + end = createRecord("nested", createNestedField("f", Schema.Type.LONG)); + assertEquals(end, deduceWriterSchema(end, start)); + + start = createRecord("arrayRec", createArrayField("f", Schema.Type.INT)); + end = createRecord("arrayRec", createArrayField("f", Schema.Type.LONG)); + assertEquals(end, deduceWriterSchema(end, start)); + + start = createRecord("mapRec", createMapField("f", Schema.Type.INT)); + end = createRecord("mapRec", createMapField("f", Schema.Type.LONG)); + assertEquals(end, deduceWriterSchema(end, start)); + } + + @Test + void testAllowedTypePromotions() { + Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BYTES}; + Map<Schema.Type, Pair<Integer,Integer>> allowedPromotions = new HashMap<>(); + //allowedPromotions.key can be promoted to any type in the range allowedPromotions.value + allowedPromotions.put(Schema.Type.INT, Pair.of(0, 4)); + allowedPromotions.put(Schema.Type.LONG, Pair.of(1, 4)); + allowedPromotions.put(Schema.Type.FLOAT, Pair.of(2, 4)); + allowedPromotions.put(Schema.Type.DOUBLE, Pair.of(3, 4)); + allowedPromotions.put(Schema.Type.STRING, Pair.of(4, 4)); + allowedPromotions.put(Schema.Type.BYTES, Pair.of(5, 5)); + + Map<Schema.Type, Schema> schemaMap = new HashMap<>(); + for (Schema.Type type : promotionTypes) { + schemaMap.put(type, createRecord("rec", + createPrimitiveField("simpleField", type), + createArrayField("arrayField", type), + createMapField("mapField", type), + createNestedField("nestedField", type))); + } + + for (int i = 0; i < promotionTypes.length; i++) { + Schema startSchema = schemaMap.get(promotionTypes[i]); + Pair<Integer,Integer> minMax = allowedPromotions.get(promotionTypes[i]); + for (int j = minMax.getLeft(); j <= minMax.getRight(); j++) { + Schema endSchema = schemaMap.get(promotionTypes[j]); + assertEquals(endSchema, deduceWriterSchema(endSchema, startSchema)); + } + } + } + + @Test + void testReversePromotions() { + Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BYTES}; + Map<Schema.Type, Pair<Integer,Integer>> reversePromotions = new HashMap<>(); + //Incoming data types in the range reversePromotions.value will be promoted to reversePromotions.key + //if reversePromotions.key is the current table schema + reversePromotions.put(Schema.Type.INT, Pair.of(0, 0)); + reversePromotions.put(Schema.Type.LONG, Pair.of(0, 1)); + reversePromotions.put(Schema.Type.FLOAT, Pair.of(0, 2)); + reversePromotions.put(Schema.Type.DOUBLE, Pair.of(0, 3)); + reversePromotions.put(Schema.Type.STRING, Pair.of(0, 5)); + reversePromotions.put(Schema.Type.BYTES, Pair.of(4, 5)); + + Map<Schema.Type, Schema> schemaMap = new HashMap<>(); + for (Schema.Type type : promotionTypes) { + schemaMap.put(type, createRecord("rec", + createPrimitiveField("simpleField", type), + createArrayField("arrayField", type), + createMapField("mapField", type), + createNestedField("nestedField", type))); + } + + for (int i = 0; i < promotionTypes.length; i++) { + Schema startSchema = schemaMap.get(promotionTypes[i]); + Pair<Integer,Integer> minMax = reversePromotions.get(promotionTypes[i]); + for (int j = minMax.getLeft(); j <= minMax.getRight(); j++) { + Schema endSchema = schemaMap.get(promotionTypes[j]); + assertEquals(startSchema, deduceWriterSchema(endSchema, startSchema)); + } + } + } + + @Test + void testIllegalPromotionsBetweenPrimitives() { + Schema.Type[] promotionTypes = new Schema.Type[]{Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.BYTES}; + Map<Schema.Type, Schema> schemaMap = new HashMap<>(); + for (Schema.Type type : promotionTypes) { + schemaMap.put(type, createRecord("rec", + createPrimitiveField("simpleField", type), + createArrayField("arrayField", type), + createMapField("mapField", type), + createNestedField("nestedField", type))); + } + + String[] fieldNames = new String[]{"rec.simpleField", "rec.arrayField.element", "rec.mapField.value", "rec.nestedField.nested"}; + //int, long, float, double can't be promoted to bytes + for (int i = 0; i < 4; i++) { + Schema startSchema = schemaMap.get(promotionTypes[i]); + Schema endSchema = schemaMap.get(Schema.Type.BYTES); + Throwable t = assertThrows(SchemaBackwardsCompatibilityException.class, + () -> deduceWriterSchema(endSchema, startSchema)); + String baseString = String.format("TYPE_MISMATCH: reader type 'BYTES' not compatible with writer type '%s' for field '%%s'", + promotionTypes[i].getName().toUpperCase()); + for (String fieldName : fieldNames) { + assertTrue(t.getMessage().contains(String.format(baseString, fieldName))); + } + } + } + + @Test + void testIllegalPromotionsBetweenComplexFields() { + String[] typeNames = new String[]{"INT", "ARRAY", "MAP", "RECORD"}; + Schema[] fieldTypes = new Schema[]{createRecord("rec", createPrimitiveField("testField", Schema.Type.INT)), + createRecord("rec", createArrayField("testField", Schema.Type.INT)), + createRecord("rec", createMapField("testField", Schema.Type.INT)), + createRecord("rec", createNestedField("testField", Schema.Type.INT))}; + + for (int i = 0; i < fieldTypes.length; i++) { + for (int j = 0; j < fieldTypes.length; j++) { + if (i != j) { + Schema startSchema = fieldTypes[i]; + Schema endSchema = fieldTypes[j]; + Throwable t = assertThrows(SchemaBackwardsCompatibilityException.class, + () -> deduceWriterSchema(startSchema, endSchema)); + String errorMessage = String.format("Schema validation backwards compatibility check failed with the following issues: " + + "{TYPE_MISMATCH: reader type '%s' not compatible with writer type '%s' for field 'rec.testField'}", typeNames[i], typeNames[j]); + assertTrue(t.getMessage().startsWith(errorMessage)); + } + } + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMissingColumn(boolean allowDroppedColumns) { + //simple case + Schema start = createRecord("missingSimpleField", + createPrimitiveField("field1", Schema.Type.INT), + createPrimitiveField("field2", Schema.Type.INT), + createPrimitiveField("field3", Schema.Type.INT)); + Schema end = createRecord("missingSimpleField", + createPrimitiveField("field1", Schema.Type.INT), + createPrimitiveField("field3", Schema.Type.INT)); + try { + assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns)); + assertTrue(allowDroppedColumns); + } catch (MissingSchemaFieldException e) { + assertFalse(allowDroppedColumns); + assertTrue(e.getMessage().contains("missingSimpleField.field2")); + } + + //complex case + start = createRecord("missingComplexField", + createPrimitiveField("field1", Schema.Type.INT), + createPrimitiveField("field2", Schema.Type.INT), + createArrayField("field3", createRecord("nestedRecord", + createPrimitiveField("nestedField1", Schema.Type.INT), + createPrimitiveField("nestedField2", Schema.Type.INT), + createPrimitiveField("nestedField3", Schema.Type.INT))), + createPrimitiveField("field4", Schema.Type.INT)); + end = createRecord("missingComplexField", + createPrimitiveField("field1", Schema.Type.INT), + createPrimitiveField("field2", Schema.Type.INT), + createPrimitiveField("field4", Schema.Type.INT)); + try { + assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns)); + assertTrue(allowDroppedColumns); + } catch (MissingSchemaFieldException e) { + assertFalse(allowDroppedColumns); + assertTrue(e.getMessage().contains("missingComplexField.field3")); + } + + //partial missing field + end = createRecord("missingComplexField", + createPrimitiveField("field1", Schema.Type.INT), + createArrayField("field3", createRecord("nestedRecord", + createPrimitiveField("nestedField2", Schema.Type.INT), + createPrimitiveField("nestedField3", Schema.Type.INT))), + createPrimitiveField("field4", Schema.Type.INT)); + try { + assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns)); + assertTrue(allowDroppedColumns); + } catch (MissingSchemaFieldException e) { + assertFalse(allowDroppedColumns); + assertTrue(e.getMessage().contains("missingComplexField.field3.element.nestedRecord.nestedField1")); + assertTrue(e.getMessage().contains("missingComplexField.field2")); + } + } + + private static Schema deduceWriterSchema(Schema incomingSchema, Schema latestTableSchema) { + return deduceWriterSchema(incomingSchema, latestTableSchema, false); + } + + private static final TypedProperties TYPED_PROPERTIES = new TypedProperties(); + + private static Schema deduceWriterSchema(Schema incomingSchema, Schema latestTableSchema, Boolean addNull) { + TYPED_PROPERTIES.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(), addNull.toString()); + return HoodieSchemaUtils.deduceWriterSchema(incomingSchema, Option.ofNullable(latestTableSchema), + Option.empty(), TYPED_PROPERTIES); + } + + private static Schema.Field createNestedField(String name, Schema.Type type) { + return createNestedField(name, Schema.create(type)); + } + + private static Schema.Field createNestedField(String name, Schema schema) { + return new Schema.Field(name, createRecord(name, new Schema.Field("nested", schema, null, null)), null, null); + } + + private static Schema.Field createArrayField(String name, Schema.Type type) { + return createArrayField(name, Schema.create(type)); + } + + private static Schema.Field createArrayField(String name, Schema schema) { + return new Schema.Field(name, Schema.createArray(schema), null, null); + } + + private static Schema.Field createMapField(String name, Schema.Type type) { + return createMapField(name, Schema.create(type)); + } + + private static Schema.Field createMapField(String name, Schema schema) { + return new Schema.Field(name, Schema.createMap(schema), null, null); + } + + private static Schema.Field createPrimitiveField(String name, Schema.Type type) { + return new Schema.Field(name, Schema.create(type), null, null); + } + + private static Schema createRecord(String name, Schema.Field... fields) { + return Schema.createRecord(name, null, null, false, Arrays.asList(fields)); + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ff87a90cef8..22a61d58881 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -22,8 +22,9 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENERATOR_CLASS_NAME} import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} +import org.apache.hudi.avro.AvroSchemaCompatibility.SchemaIncompatibilityType import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType @@ -36,9 +37,10 @@ import org.apache.hudi.common.util import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.metrics.HoodieMetricsConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.exception.{HoodieException, SchemaBackwardsCompatibilityException} import org.apache.hudi.functional.CommonOptionUtils._ import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable +import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.metrics.{Metrics, MetricsReporterType} @@ -1759,6 +1761,50 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(0, result.filter(result("id") === 1).count()) } + /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */ + @Test + def testSchemaEvolutionWithNewColumn(): Unit = { + val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date") + var hudiOptions = Map[String, String]( + HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger", + KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id", + KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version", + DataSourceWriteOptions.OPERATION.key() -> "insert", + HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts", + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.ComplexKeyGenerator", + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true", + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false", + HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> "org.apache.hudi.HoodieSparkRecordMerger" + ) + df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath) + + // Try adding a string column. This operation is expected to throw 'schema not compatible' exception since + // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default. + val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 'foo' as event_date, 'bar' as add_col") + try { + (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) + fail("Option succeeded, but was expected to fail.") + } catch { + case ex: SchemaBackwardsCompatibilityException => { + assertTrue(ex.getMessage.contains(SchemaIncompatibilityType.READER_FIELD_MISSING_DEFAULT_VALUE.name())) + } + case ex: Exception => { + fail(ex) + } + } + + // Try adding the string column again. This operation is expected to succeed since 'MAKE_NEW_COLUMNS_NULLABLE' + // parameter has been set to 'true'. + hudiOptions = hudiOptions + (HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() -> "true") + try { + (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) + } catch { + case ex: Exception => { + fail(ex) + } + } + } + def assertLastCommitIsUpsert(): Boolean = { val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index fe8eb909db4..0c68831fcd8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -667,10 +667,9 @@ public class StreamSync implements Serializable, Closeable { new HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg)), metaClient)); // Deduce proper target (writer's) schema for the input dataset, reconciling its // schema w/ the table's one - Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema( - HoodieAvroUtils.removeMetadataFields(incomingSchema), - HoodieConversionUtils.toScalaOption(latestTableSchemaOpt), - HoodieConversionUtils.toScalaOption(internalSchemaOpt), props); + Schema targetSchema = HoodieSchemaUtils.deduceWriterSchema( + HoodieAvroUtils.removeMetadataFields(incomingSchema), + latestTableSchemaOpt, internalSchemaOpt, props); // Override schema provider with the reconciled target schema return new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), sourceSchemaProvider, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java index eee30c84411..4a5ad75ea84 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.exception.MissingSchemaFieldException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.streamer.HoodieStreamer; @@ -125,6 +125,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true)); b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true)); b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false)); + b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false)); b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false)); } return b.build(); @@ -220,8 +221,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta addData(df, false); deltaStreamer.sync(); assertTrue(allowNullForDeletedCols); - } catch (SchemaCompatibilityException e) { - assertTrue(e.getMessage().contains("Incoming batch schema is not compatible with the table's one")); + } catch (MissingSchemaFieldException e) { assertFalse(allowNullForDeletedCols); return; } @@ -404,10 +404,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick extends TestHoodieDelta assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes() .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING))); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); - } catch (SchemaCompatibilityException e) { + } catch (MissingSchemaFieldException e) { assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema); - assertTrue(e.getMessage().contains("Incoming batch schema is not compatible with the table's one")); - assertFalse(allowNullForDeletedCols); } }
