the-other-tim-brown commented on code in PR #17763:
URL: https://github.com/apache/hudi/pull/17763#discussion_r2655487016
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -651,6 +652,25 @@ public static Object
convertValueForSpecificDataTypes(HoodieSchema fieldSchema,
);
}
+ /**
+ * Fetch schema for record key and partition path.
+ * This is equivalent to HoodieAvroUtils.getRecordKeyPartitionPathSchema()
but returns HoodieSchema.
+ *
+ * @return HoodieSchema containing record key and partition path fields
+ */
+ public static HoodieSchema getRecordKeyPartitionPathSchema() {
+ List<HoodieSchemaField> toBeAddedFields = new ArrayList<>();
Review Comment:
super nitpick: initialize this with size 2
##########
hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java:
##########
@@ -315,7 +314,7 @@ public abstract ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPosit
* @param filePath the data file path.
* @return the Avro schema of the data file.
*/
- public abstract Schema readAvroSchema(HoodieStorage storage, StoragePath
filePath);
+ public abstract HoodieSchema readHoodieSchema(HoodieStorage storage,
StoragePath filePath);
Review Comment:
lets name this `readSchema`?
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/OrcReaderIterator.java:
##########
@@ -40,22 +40,22 @@
public class OrcReaderIterator<T> implements ClosableIterator<T> {
private final RecordReader recordReader;
- private final Schema avroSchema;
+ private final HoodieSchema schema;
private final List<String> fieldNames;
private final List<TypeDescription> orcFieldTypes;
- private final Schema[] avroFieldSchemas;
+ private final HoodieSchema[] fieldSchemas;
private final VectorizedRowBatch batch;
private int rowInBatch;
private T next;
- public OrcReaderIterator(RecordReader recordReader, Schema schema,
TypeDescription orcSchema) {
+ public OrcReaderIterator(RecordReader recordReader, HoodieSchema schema,
TypeDescription orcSchema) {
this.recordReader = recordReader;
- this.avroSchema = schema;
+ this.schema = schema;
this.fieldNames = orcSchema.getFieldNames();
this.orcFieldTypes = orcSchema.getChildren();
- this.avroFieldSchemas = fieldNames.stream()
- .map(fieldName -> avroSchema.getField(fieldName).schema())
- .toArray(Schema[]::new);
+ this.fieldSchemas = fieldNames.stream()
+ .map(fieldName -> this.schema.getField(fieldName).get().schema())
Review Comment:
let's use `orElseThrow` if the field is missing instead of directly using
`get`
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java:
##########
@@ -671,13 +622,65 @@ public static TypeDescription createOrcSchema(Schema
avroSchema) {
case INT:
return TypeDescription.createInt();
case BYTES:
+ case FIXED:
return TypeDescription.createBinary();
+ case DECIMAL:
+ return TypeDescription.createDecimal()
+ .withPrecision(((HoodieSchema.Decimal) schema).getPrecision())
+ .withScale(((HoodieSchema.Decimal) schema).getScale());
+ case DATE:
+ // The date logical type represents a date within the calendar, with
no reference to a particular time zone
+ // or time of day.
+ //
+ // A date logical type annotates an Avro int, where the int stores the
number of days from the unix epoch, 1
+ // January 1970 (ISO calendar).
+ return TypeDescription.createDate();
+ case TIME:
+ HoodieSchema.Time timeSchema = (HoodieSchema.Time) schema;
+ if (timeSchema.getPrecision() == TimePrecision.MILLIS) {
+ // The time-millis logical type represents a time of day, with no
reference to a particular calendar, time
+ // zone or date, with a precision of one millisecond.
+ //
+ // A time-millis logical type annotates an Avro int, where the int
stores the number of milliseconds after
+ // midnight, 00:00:00.000.
+ return TypeDescription.createInt();
+ } else if (timeSchema.getPrecision() == TimePrecision.MICROS) {
+ // The time-micros logical type represents a time of day, with no
reference to a particular calendar, time
+ // zone or date, with a precision of one microsecond.
+ //
+ // A time-micros logical type annotates an Avro long, where the long
stores the number of microseconds after
+ // midnight, 00:00:00.000000.
+ return TypeDescription.createLong();
+ } else {
+ throw new IllegalStateException(
+ String.format("Unrecognized TimePrecision for: %s for Time type:
%s", timeSchema.getPrecision(), timeSchema));
+ }
+ case TIMESTAMP:
Review Comment:
It looks like we previously did not support translating local timestamps,
should that be preserved here? They likely just get treated as longs
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java:
##########
@@ -108,27 +108,27 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
@Override
protected ClosableIterator<IndexedRecord>
getIndexedRecordIterator(HoodieSchema schema) throws IOException {
//TODO boundary for now to revisit in later pr to use HoodieSchema
- return getIndexedRecordIteratorInternal(schema.getAvroSchema(),
Collections.emptyMap());
+ return getIndexedRecordIteratorInternal(schema, Collections.emptyMap());
}
@Override
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(HoodieSchema
readerSchema, HoodieSchema requestedSchema) throws IOException {
//TODO boundary for now to revisit in later pr to use HoodieSchema
- return getIndexedRecordIteratorInternal(requestedSchema.getAvroSchema(),
Collections.emptyMap());
+ return getIndexedRecordIteratorInternal(requestedSchema,
Collections.emptyMap());
}
@Override
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(HoodieSchema
readerSchema, HoodieSchema requestedSchema, Map<String, String> renamedColumns)
throws IOException {
//TODO boundary for now to revisit in later pr to use HoodieSchema
- return getIndexedRecordIteratorInternal(requestedSchema.getAvroSchema(),
renamedColumns);
+ return getIndexedRecordIteratorInternal(requestedSchema, renamedColumns);
}
@Override
public HoodieSchema getSchema() {
if (fileSchema.isEmpty()) {
- fileSchema = Option.ofNullable(parquetUtils.readAvroSchema(storage,
path));
+ fileSchema = Option.ofNullable(parquetUtils.readHoodieSchema(storage,
path));
}
- return HoodieSchema.fromAvroSchema(fileSchema.get());
+ return fileSchema.get();
Review Comment:
nit: this could be simplified to `fileSchema.orElseGet(...`
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java:
##########
@@ -358,17 +356,14 @@ private void writeParquetFile(String typeCode, String
filePath, List<String> row
writer.close();
}
- private static Schema getSchemaWithFields(List<String> fields) {
- List<Schema.Field> toBeAddedFields = new ArrayList<>();
- Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "",
false);
-
+ private static HoodieSchema getSchemaWithFields(List<String> fields) {
+ List<HoodieSchemaField> toBeAddedFields = new ArrayList<>();
for (String field: fields) {
- Schema.Field schemaField =
- new Schema.Field(field, METADATA_FIELD_SCHEMA, "",
JsonProperties.NULL_VALUE);
Review Comment:
Should we continue to use a constant here?
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java:
##########
@@ -108,27 +108,27 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
@Override
protected ClosableIterator<IndexedRecord>
getIndexedRecordIterator(HoodieSchema schema) throws IOException {
//TODO boundary for now to revisit in later pr to use HoodieSchema
Review Comment:
Let's remove all the `//TODO boundary..` comments in this file
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java:
##########
@@ -65,22 +64,22 @@ public HoodieHFileRecordReader(Configuration conf,
InputSplit split, JobConf job
.getFileReader(hoodieConfig, path, HoodieFileFormat.HFILE,
Option.empty());
//TODO boundary for now to revisit in later pr to use HoodieSchema
Review Comment:
remove todo?
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java:
##########
@@ -82,12 +79,12 @@ public class AvroOrcUtils {
*
* @param type ORC schema of the value Object.
* @param colVector The column vector to store the value Object.
- * @param avroSchema Avro schema of the value Object.
+ * @param schema Avro schema of the value Object.
Review Comment:
nit: Update the comment to fix alignment and remove `Avro`
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -116,96 +118,112 @@ public AvroSchemaConverterWithTimestampNTZ(Configuration
conf) {
this.pathsToInt96 = new
HashSet<>(Arrays.asList(conf.getStrings("parquet.avro.writeFixedAsInt96", new
String[0])));
}
- /**
- * Given a schema, check to see if it is a union of a null type and a
regular schema,
- * and then return the non-null sub-schema. Otherwise, return the given
schema.
- *
- * @param schema The schema to check
- * @return The non-null portion of a union schema, or the given schema
- */
- public static Schema getNonNull(Schema schema) {
- if (schema.getType().equals(Schema.Type.UNION)) {
- List<Schema> schemas = schema.getTypes();
- if (schemas.size() == 2) {
- if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
- return schemas.get(1);
- } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
- return schemas.get(0);
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- } else {
- return schema;
- }
- }
-
@Override
- public MessageType convert(Schema avroSchema) {
- if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
- throw new IllegalArgumentException("Avro schema must be a record.");
+ public MessageType convert(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ throw new IllegalArgumentException("Hoodie schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(),
convertFields(avroSchema.getFields(), ""));
+ return new MessageType(schema.getFullName(),
convertFields(schema.getFields(), ""));
}
- private List<Type> convertFields(List<Schema.Field> fields, String
schemaPath) {
+ private List<Type> convertFields(List<HoodieSchemaField> fields, String
schemaPath) {
List<Type> types = new ArrayList<Type>();
- for (Schema.Field field : fields) {
- if (field.schema().getType().equals(Schema.Type.NULL)) {
+ for (HoodieSchemaField field : fields) {
+ if (field.schema().getType() == HoodieSchemaType.NULL) {
continue; // Avro nulls are not encoded, unless they are null unions
}
types.add(convertField(field, appendPath(schemaPath, field.name())));
}
return types;
}
- private Type convertField(String fieldName, Schema schema, String
schemaPath) {
+ private Type convertField(String fieldName, HoodieSchema schema, String
schemaPath) {
return convertField(fieldName, schema, Type.Repetition.REQUIRED,
schemaPath);
}
@SuppressWarnings("deprecation")
- private Type convertField(String fieldName, Schema schema, Type.Repetition
repetition, String schemaPath) {
+ private Type convertField(String fieldName, HoodieSchema schema,
Type.Repetition repetition, String schemaPath) {
Types.PrimitiveBuilder<PrimitiveType> builder;
- Schema.Type type = schema.getType();
- LogicalType logicalType = schema.getLogicalType();
- if (type.equals(Schema.Type.BOOLEAN)) {
+ HoodieSchemaType type = schema.getType();
+ if (type == HoodieSchemaType.BOOLEAN) {
Review Comment:
let's update this to a switch statement?
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java:
##########
@@ -79,8 +79,8 @@ public void testCanWrite() throws IOException {
BloomFilterTypeCode.DYNAMIC_V0.name());
StorageConfiguration conf =
HoodieTestUtils.getDefaultStorageConfWithDefaults();
- Schema schema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
- HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new
AvroSchemaConverter().convert(schema),
+ HoodieSchema schema =
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
Review Comment:
This can be HoodieTestDataGenerator.HOODIE_SCHEMA
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]