the-other-tim-brown commented on code in PR #13572:
URL: https://github.com/apache/hudi/pull/13572#discussion_r2224138897
##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -255,6 +254,41 @@ private static TypeInfo generateTypeInfoWorker(Schema
schema,
}
}
+ public static boolean isNullableType(Schema schema) {
+ if (!schema.getType().equals(Schema.Type.UNION)) {
+ return false;
+ }
+
+ List<Schema> itemSchemas = schema.getTypes();
+ if (itemSchemas.size() < 2) {
+ return false;
+ }
+
+ for (Schema itemSchema : itemSchemas) {
+ if (Schema.Type.NULL.equals(itemSchema.getType())) {
+ return true;
+ }
+ }
+
+ // [null, null] not allowed, so this check is ok.
+ return false;
+ }
+
+ public static Schema getOtherTypeFromNullableType(Schema schema) {
Review Comment:
Similarly there is `AvroSchemaUtils#resolveNullableSchema` that will provide
the same functionality.
##########
hudi-hadoop-mr/src/main/java/org/apache/hadoop/hive/serde2/avro/HiveTypeUtils.java:
##########
@@ -255,6 +254,41 @@ private static TypeInfo generateTypeInfoWorker(Schema
schema,
}
}
+ public static boolean isNullableType(Schema schema) {
Review Comment:
Can we use `AvroSchemaUtils#isNullable` here?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -239,10 +250,53 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param storage {@link HoodieStorage} for reading records.
* @return {@link ClosableIterator<T>} that can return all records through
iteration.
*/
- public ClosableIterator<T> getFileRecordIterator(
- StoragePathInfo storagePathInfo, long start, long length, Schema
dataSchema, Schema requiredSchema,
- HoodieStorage storage) throws IOException {
- return getFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ public final ClosableIterator<T> getFileRecordIterator(
+ StoragePathInfo storagePathInfo, long start, long length, Option<Schema>
dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(Either.right(storagePathInfo), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ protected abstract ClosableIterator<T> doGetFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema, Schema requiredSchema,
HoodieStorage storage) throws IOException;
+
+ protected ClosableIterator<T> doGetFileRecordIterator(StoragePathInfo
storagePathInfo, long start, long length, Schema dataSchema, Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ return doGetFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ /**
+ * Handle schema evolution
+ */
+ private ClosableIterator<T> getFileRecordIterator(Either<StoragePath,
StoragePathInfo> filePathEither, long start, long length,
+ Option<Schema> dataSchema,
Schema requiredSchema, HoodieStorage storage) throws IOException {
+ if (isMetadataTable() ||
getSchemaHandler().getInternalSchemaOpt().isPresent()) {
+ return getFileRecordIteratorInternal(filePathEither, start, length,
dataSchema.get(), requiredSchema, storage);
+ }
+ Schema actualDataSchema = getActualDataSchema(dataSchema, filePathEither,
storage);
+ Schema actualRequriredSchema =
AvroSchemaUtils.pruneDataSchemaResolveNullable(actualDataSchema,
requiredSchema, getSchemaHandler().getPruneExcludeFields());
+ if (AvroSchemaUtils.areSchemasPrettyMuchEqual(actualRequriredSchema,
requiredSchema)) {
+ return getFileRecordIteratorInternal(filePathEither, start, length,
actualDataSchema, requiredSchema, storage);
+ }
+ UnaryOperator<T> projection = projectRecord(actualRequriredSchema,
requiredSchema);
+ return new
CloseableMappingIterator<>(getFileRecordIteratorInternal(filePathEither, start,
length, actualDataSchema, actualRequriredSchema, storage), projection);
+ }
+
+ private Schema getActualDataSchema(Option<Schema> dataSchema,
Either<StoragePath, StoragePathInfo> filePathEither, HoodieStorage storage)
throws IOException {
Review Comment:
If schema-on-write is enabled, can we skip this step? I am worried about
performance implications of checking each file's schema.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -239,10 +250,53 @@ public abstract ClosableIterator<T> getFileRecordIterator(
* @param storage {@link HoodieStorage} for reading records.
* @return {@link ClosableIterator<T>} that can return all records through
iteration.
*/
- public ClosableIterator<T> getFileRecordIterator(
- StoragePathInfo storagePathInfo, long start, long length, Schema
dataSchema, Schema requiredSchema,
- HoodieStorage storage) throws IOException {
- return getFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ public final ClosableIterator<T> getFileRecordIterator(
+ StoragePathInfo storagePathInfo, long start, long length, Option<Schema>
dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException {
+ return getFileRecordIterator(Either.right(storagePathInfo), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ protected abstract ClosableIterator<T> doGetFileRecordIterator(StoragePath
filePath, long start, long length, Schema dataSchema, Schema requiredSchema,
HoodieStorage storage) throws IOException;
+
+ protected ClosableIterator<T> doGetFileRecordIterator(StoragePathInfo
storagePathInfo, long start, long length, Schema dataSchema, Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ return doGetFileRecordIterator(storagePathInfo.getPath(), start, length,
dataSchema, requiredSchema, storage);
+ }
+
+ /**
+ * Handle schema evolution
+ */
+ private ClosableIterator<T> getFileRecordIterator(Either<StoragePath,
StoragePathInfo> filePathEither, long start, long length,
+ Option<Schema> dataSchema,
Schema requiredSchema, HoodieStorage storage) throws IOException {
+ if (isMetadataTable() ||
getSchemaHandler().getInternalSchemaOpt().isPresent()) {
+ return getFileRecordIteratorInternal(filePathEither, start, length,
dataSchema.get(), requiredSchema, storage);
+ }
+ Schema actualDataSchema = getActualDataSchema(dataSchema, filePathEither,
storage);
+ Schema actualRequriredSchema =
AvroSchemaUtils.pruneDataSchemaResolveNullable(actualDataSchema,
requiredSchema, getSchemaHandler().getPruneExcludeFields());
Review Comment:
Naming is a bit confusing here, I'm assuming this is meant to match the data
file's schema but limited to the required fields? If so, maybe
`prunedDataSchema`?
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -376,6 +380,150 @@ public static Schema
createNewSchemaFromFieldsWithReference(Schema schema, List<
return newSchema;
}
+ public static boolean areSchemasPrettyMuchEqual(Schema schema1, Schema
schema2) {
Review Comment:
Can you add a javadoc explaining when to use this?
Also maybe `areSchemasEquivalent` would be a better name?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -379,7 +379,7 @@ object HoodieInternalRowUtils {
case (_: DoubleType, _) =>
prevDataType match {
- case _: FloatType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+ case _: FloatType => (fieldUpdater, ordinal, value) =>
fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toString.toDouble)
Review Comment:
Why convert to string and then to double?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]