the-other-tim-brown commented on code in PR #14313:
URL: https://github.com/apache/hudi/pull/14313#discussion_r2554408972


##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -72,9 +73,10 @@ protected void setPartitionPathField(int position, Object 
fieldValue, T row) {
     };
   }
 
-  public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema schema) 
throws IOException {
+  public ClosableIterator<HoodieRecord<T>> getRecordIterator(HoodieSchema 
schema) throws IOException {
     ClosableIterator<HoodieRecord<T>> skeletonIterator = 
skeletonFileReader.getRecordIterator(schema);
     ClosableIterator<HoodieRecord<T>> dataFileIterator = 
dataFileReader.getRecordIterator(dataFileReader.getSchema());
+    // TODO boundary for now to revisit in later pr to use HoodieSchema

Review Comment:
   This is now using the HoodieSchema, right?



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -718,6 +718,25 @@ public HoodieSchema parse(String jsonSchema) {
         throw new HoodieAvroSchemaException("Failed to parse schema: " + 
jsonSchema, e);
       }
     }
+
+    /**
+     * Parses a schema from an InputStream.
+     *
+     * @param inputStream the InputStream containing the JSON schema
+     * @return parsed HoodieSchema
+     * @throws java.io.IOException if reading from the stream fails
+     * @throws HoodieAvroSchemaException if the schema is invalid
+     */
+    public HoodieSchema parse(java.io.InputStream inputStream) throws 
java.io.IOException {

Review Comment:
   Instead of throwing IOException, should we wrap this in a HoodieIOException?
   
   Also let's only use fully qualified names when required and prefer importing 
classes.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java:
##########
@@ -333,12 +334,14 @@ private static void writeParquetFile(String instant,
         path,
         metaClient.getStorage(),
         metaClient.getTableConfig(),
-        HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
+        
HoodieSchema.fromAvroSchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),

Review Comment:
   Similar to the other `HoodieSchema` you added to `HoodieTestDataGenerator`, 
can you update the class to have an equivalent `HoodieSchema` for 
`AVRO_SCHEMA_WITH_METADATA_FIELDS`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java:
##########
@@ -146,7 +147,7 @@ public void write(
           preWriteCallback.ifPresent(callback -> 
callback.accept(activeAction));
           // in local FS and HDFS, there could be empty completed instants due 
to crash.
           final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createLSMTimelineInstant(activeAction, metaClient);
-          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), HoodieSchema.fromAvroSchema(wrapperSchema));

Review Comment:
   Let's move this `fromAvroSchema` outside of the loop for now so we don't 
have so many conversions on this path.



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java:
##########
@@ -454,11 +457,11 @@ public Pair<ByteArrayOutputStream, Object> 
serializeRecordsToLogBlock(HoodieStor
     config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
 
     HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(
-        HoodieFileFormat.PARQUET, outputStream, storage, config, writerSchema, 
recordType);
+        HoodieFileFormat.PARQUET, outputStream, storage, config, 
HoodieSchema.fromAvroSchema(writerSchema), recordType);
     while (recordItr.hasNext()) {
       HoodieRecord record = recordItr.next();
       String recordKey = record.getRecordKey(readerSchema, keyFieldName);
-      parquetWriter.write(recordKey, record, writerSchema);
+      parquetWriter.write(recordKey, record, 
HoodieSchema.fromAvroSchema(writerSchema));

Review Comment:
   Similarly here, we can call `HoodieSchema.fromAvroSchema` once above at line 
460 and reuse.



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -61,9 +61,10 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String> 
candidateRowKeys) {
   }
 
   @Override
-  public ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
+  public ClosableIterator<HoodieRecord<T>> getRecordIterator(HoodieSchema 
readerSchema, HoodieSchema requestedSchema) throws IOException {
     ClosableIterator<HoodieRecord<T>> skeletonIterator = 
skeletonFileReader.getRecordIterator(readerSchema, requestedSchema);
-    ClosableIterator<HoodieRecord<T>> dataFileIterator = 
dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema),
 requestedSchema);
+    // TODO boundary for now to revisit HoodieAvroUtils in later pr to use 
HoodieSchema

Review Comment:
   Can this be removed?



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java:
##########
@@ -100,18 +103,20 @@ public boolean hasNext() {
 
       @Override
       public String next() {
+        // TODO boundary for now
         HoodieRecord<T> skeletonRecord = skeletonIterator.next();
-        return skeletonRecord.getRecordKey(schema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+        return skeletonRecord.getRecordKey(schema.getAvroSchema(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       }
     };
   }
 
   protected abstract void setPartitionField(int position, Object fieldValue, T 
row);
 
   @Override
-  public Schema getSchema() {
+  public HoodieSchema getSchema() {
     // return merged schema (meta fields + data file schema)
-    return HoodieAvroUtils.addMetadataFields(dataFileReader.getSchema());
+    // TODO boundary for now to revisit HoodieAvroUtils in later pr to use 
HoodieSchema

Review Comment:
   Similarly here, this comment seems like it can be removed.



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java:
##########
@@ -427,11 +428,13 @@ public ByteArrayOutputStream 
serializeRecordsToLogBlock(HoodieStorage storage,
     config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
     config.setValue("hoodie.avro.schema", writerSchema.toString());
     HoodieRecord.HoodieRecordType recordType = 
records.iterator().next().getRecordType();
+    HoodieSchema schema = HoodieSchema.fromAvroSchema(writerSchema);
     try (HoodieFileWriter parquetWriter = 
HoodieFileWriterFactory.getFileWriter(
-        HoodieFileFormat.PARQUET, outputStream, storage, config, writerSchema, 
recordType)) {
+        //TODO boundary to revisit in follow up to use HoodieSchema directly
+        HoodieFileFormat.PARQUET, outputStream, storage, config, schema, 
recordType)) {
       for (HoodieRecord<?> record : records) {
         String recordKey = record.getRecordKey(readerSchema, keyFieldName);
-        parquetWriter.write(recordKey, record, writerSchema);
+        parquetWriter.write(recordKey, record, 
HoodieSchema.fromAvroSchema(writerSchema));

Review Comment:
   I think this can just reuse the variable declared at line 431, right?



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java:
##########
@@ -187,7 +189,7 @@ protected void writeFileWithSimpleSchema() throws Exception 
{
       record.put("time", Integer.toString(i));
       record.put("number", i);
       HoodieRecord avroRecord = new HoodieAvroIndexedRecord(record);
-      writer.write(key, avroRecord, avroSchema);
+      writer.write(key, avroRecord, HoodieSchema.fromAvroSchema(avroSchema));

Review Comment:
   Let's update line 181 to use the new util?



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java:
##########
@@ -64,15 +64,17 @@ public boolean hasNext() {
   public HoodieRecord<T> next() {
     HoodieRecord<T> dataRecord = dataFileIterator.next();
     HoodieRecord<T> skeletonRecord = skeletonIterator.next();
-    HoodieRecord<T> ret = dataRecord.prependMetaFields(schema, schema,
-        new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD))
-            .setCommitSeqno(skeletonRecord.getRecordKey(schema, 
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
-            .setRecordKey(skeletonRecord.getRecordKey(schema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD))
-            .setPartitionPath(skeletonRecord.getRecordKey(schema, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD))
-            .setFileName(skeletonRecord.getRecordKey(schema, 
HoodieRecord.FILENAME_METADATA_FIELD)), null);
+    //TODO boundary to revisit in later pr to use HoodieSchema
+    Schema avroSchema = schema.getAvroSchema();
+    HoodieRecord<T> ret = dataRecord.prependMetaFields(avroSchema, avroSchema,
+        new 
MetadataValues().setCommitTime(skeletonRecord.getRecordKey(avroSchema, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD))
+            .setCommitSeqno(skeletonRecord.getRecordKey(avroSchema, 
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))
+            .setRecordKey(skeletonRecord.getRecordKey(avroSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD))
+            .setPartitionPath(skeletonRecord.getRecordKey(avroSchema, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD))
+            .setFileName(skeletonRecord.getRecordKey(avroSchema, 
HoodieRecord.FILENAME_METADATA_FIELD)), null);
     if (partitionFields.isPresent()) {
       for (int i = 0; i < partitionValues.length; i++) {
-        int position = schema.getField(partitionFields.get()[i]).pos();
+        int position = schema.getField(partitionFields.get()[i]).get().pos();

Review Comment:
   I think we should handle the empty option here. In this case it seems like 
we can use a `orElseThrow`?



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java:
##########
@@ -262,7 +264,7 @@ private void verifyFilterRowKeys(HoodieAvroFileReader 
hoodieReader) {
 
   private void verifyReaderWithSchema(String schemaPath, HoodieAvroFileReader 
hoodieReader) throws IOException {
     Schema evolvedSchema = 
getSchemaFromResource(TestHoodieReaderWriterBase.class, schemaPath);

Review Comment:
   Can this be `HoodieSchema` as well now that the new utility method exists 
for reading the schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to