the-other-tim-brown commented on code in PR #14313:
URL: https://github.com/apache/hudi/pull/14313#discussion_r2554408972
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -72,9 +73,10 @@ protected void setPartitionPathField(int position, Object
fieldValue, T row) {
};
}
- public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema schema)
throws IOException {
+ public ClosableIterator<HoodieRecord<T>> getRecordIterator(HoodieSchema
schema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator =
skeletonFileReader.getRecordIterator(schema);
ClosableIterator<HoodieRecord<T>> dataFileIterator =
dataFileReader.getRecordIterator(dataFileReader.getSchema());
+ // TODO boundary for now to revisit in later pr to use HoodieSchema
Review Comment:
This is now using the HoodieSchema, right?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -718,6 +718,25 @@ public HoodieSchema parse(String jsonSchema) {
throw new HoodieAvroSchemaException("Failed to parse schema: " +
jsonSchema, e);
}
}
+
+ /**
+ * Parses a schema from an InputStream.
+ *
+ * @param inputStream the InputStream containing the JSON schema
+ * @return parsed HoodieSchema
+ * @throws java.io.IOException if reading from the stream fails
+ * @throws HoodieAvroSchemaException if the schema is invalid
+ */
+ public HoodieSchema parse(java.io.InputStream inputStream) throws
java.io.IOException {
Review Comment:
Instead of throwing IOException, should we wrap this in a HoodieIOException?
Also let's only use fully qualified names when required and prefer importing
classes.
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java:
##########
@@ -333,12 +334,14 @@ private static void writeParquetFile(String instant,
path,
metaClient.getStorage(),
metaClient.getTableConfig(),
- HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
+
HoodieSchema.fromAvroSchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
Review Comment:
Similar to the other `HoodieSchema` you added to `HoodieTestDataGenerator`,
can you update the class to have an equivalent `HoodieSchema` for
`AVRO_SCHEMA_WITH_METADATA_FIELDS`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java:
##########
@@ -146,7 +147,7 @@ public void write(
preWriteCallback.ifPresent(callback ->
callback.accept(activeAction));
// in local FS and HDFS, there could be empty completed instants due
to crash.
final HoodieLSMTimelineInstant metaEntry =
MetadataConversionUtils.createLSMTimelineInstant(activeAction, metaClient);
- writer.write(metaEntry.getInstantTime(), new
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+ writer.write(metaEntry.getInstantTime(), new
HoodieAvroIndexedRecord(metaEntry), HoodieSchema.fromAvroSchema(wrapperSchema));
Review Comment:
Let's move this `fromAvroSchema` outside of the loop for now so we don't
have so many conversions on this path.
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java:
##########
@@ -454,11 +457,11 @@ public Pair<ByteArrayOutputStream, Object>
serializeRecordsToLogBlock(HoodieStor
config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(
- HoodieFileFormat.PARQUET, outputStream, storage, config, writerSchema,
recordType);
+ HoodieFileFormat.PARQUET, outputStream, storage, config,
HoodieSchema.fromAvroSchema(writerSchema), recordType);
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
String recordKey = record.getRecordKey(readerSchema, keyFieldName);
- parquetWriter.write(recordKey, record, writerSchema);
+ parquetWriter.write(recordKey, record,
HoodieSchema.fromAvroSchema(writerSchema));
Review Comment:
Similarly here, we can call `HoodieSchema.fromAvroSchema` once above at line
460 and reuse.
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -61,9 +61,10 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
}
@Override
- public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
+ public ClosableIterator<HoodieRecord<T>> getRecordIterator(HoodieSchema
readerSchema, HoodieSchema requestedSchema) throws IOException {
ClosableIterator<HoodieRecord<T>> skeletonIterator =
skeletonFileReader.getRecordIterator(readerSchema, requestedSchema);
- ClosableIterator<HoodieRecord<T>> dataFileIterator =
dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema),
requestedSchema);
+ // TODO boundary for now to revisit HoodieAvroUtils in later pr to use
HoodieSchema
Review Comment:
Can this be removed?
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -100,18 +103,20 @@ public boolean hasNext() {
@Override
public String next() {
+ // TODO boundary for now
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
- return skeletonRecord.getRecordKey(schema,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ return skeletonRecord.getRecordKey(schema.getAvroSchema(),
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
};
}
protected abstract void setPartitionField(int position, Object fieldValue, T
row);
@Override
- public Schema getSchema() {
+ public HoodieSchema getSchema() {
// return merged schema (meta fields + data file schema)
- return HoodieAvroUtils.addMetadataFields(dataFileReader.getSchema());
+ // TODO boundary for now to revisit HoodieAvroUtils in later pr to use
HoodieSchema
Review Comment:
Similarly here, this comment seems like it can be removed.
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java:
##########
@@ -427,11 +428,13 @@ public ByteArrayOutputStream
serializeRecordsToLogBlock(HoodieStorage storage,
config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
config.setValue("hoodie.avro.schema", writerSchema.toString());
HoodieRecord.HoodieRecordType recordType =
records.iterator().next().getRecordType();
+ HoodieSchema schema = HoodieSchema.fromAvroSchema(writerSchema);
try (HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(
- HoodieFileFormat.PARQUET, outputStream, storage, config, writerSchema,
recordType)) {
+ //TODO boundary to revisit in follow up to use HoodieSchema directly
+ HoodieFileFormat.PARQUET, outputStream, storage, config, schema,
recordType)) {
for (HoodieRecord<?> record : records) {
String recordKey = record.getRecordKey(readerSchema, keyFieldName);
- parquetWriter.write(recordKey, record, writerSchema);
+ parquetWriter.write(recordKey, record,
HoodieSchema.fromAvroSchema(writerSchema));
Review Comment:
I think this can just reuse the variable declared at line 431, right?
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java:
##########
@@ -187,7 +189,7 @@ protected void writeFileWithSimpleSchema() throws Exception
{
record.put("time", Integer.toString(i));
record.put("number", i);
HoodieRecord avroRecord = new HoodieAvroIndexedRecord(record);
- writer.write(key, avroRecord, avroSchema);
+ writer.write(key, avroRecord, HoodieSchema.fromAvroSchema(avroSchema));
Review Comment:
Let's update line 181 to use the new util?
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java:
##########
@@ -64,15 +64,17 @@ public boolean hasNext() {
public HoodieRecord<T> next() {
HoodieRecord<T> dataRecord = dataFileIterator.next();
HoodieRecord<T> skeletonRecord = skeletonIterator.next();
- HoodieRecord<T> ret = dataRecord.prependMetaFields(schema, schema,
- new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD))
- .setCommitSeqno(skeletonRecord.getRecordKey(schema,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
- .setRecordKey(skeletonRecord.getRecordKey(schema,
HoodieRecord.RECORD_KEY_METADATA_FIELD))
- .setPartitionPath(skeletonRecord.getRecordKey(schema,
HoodieRecord.PARTITION_PATH_METADATA_FIELD))
- .setFileName(skeletonRecord.getRecordKey(schema,
HoodieRecord.FILENAME_METADATA_FIELD)), null);
+ //TODO boundary to revisit in later pr to use HoodieSchema
+ Schema avroSchema = schema.getAvroSchema();
+ HoodieRecord<T> ret = dataRecord.prependMetaFields(avroSchema, avroSchema,
+ new
MetadataValues().setCommitTime(skeletonRecord.getRecordKey(avroSchema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+ .setCommitSeqno(skeletonRecord.getRecordKey(avroSchema,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
+ .setRecordKey(skeletonRecord.getRecordKey(avroSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD))
+ .setPartitionPath(skeletonRecord.getRecordKey(avroSchema,
HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+ .setFileName(skeletonRecord.getRecordKey(avroSchema,
HoodieRecord.FILENAME_METADATA_FIELD)), null);
if (partitionFields.isPresent()) {
for (int i = 0; i < partitionValues.length; i++) {
- int position = schema.getField(partitionFields.get()[i]).pos();
+ int position = schema.getField(partitionFields.get()[i]).get().pos();
Review Comment:
I think we should handle the empty option here. In this case it seems like
we can use a `orElseThrow`?
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java:
##########
@@ -262,7 +264,7 @@ private void verifyFilterRowKeys(HoodieAvroFileReader
hoodieReader) {
private void verifyReaderWithSchema(String schemaPath, HoodieAvroFileReader
hoodieReader) throws IOException {
Schema evolvedSchema =
getSchemaFromResource(TestHoodieReaderWriterBase.class, schemaPath);
Review Comment:
Can this be `HoodieSchema` as well now that the new utility method exists
for reading the schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]