the-other-tim-brown commented on code in PR #14355:
URL: https://github.com/apache/hudi/pull/14355#discussion_r2583483368


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
   }
 
   @VisibleForTesting
-  public Schema getTableAvroSchema() {
+  public HoodieSchema getTableHoodieSchema() {
     try {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-      return schemaResolver.getTableAvroSchema();
+      Schema avroSchema = schemaResolver.getTableAvroSchema();
+      return HoodieSchema.fromAvroSchema(avroSchema);

Review Comment:
   Lets update this to call the new method in the table schema resolver



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -435,8 +436,9 @@ private List<MergeOnReadInputSplit> buildInputSplits() {
   }
 
   private InputFormat<RowData, ?> getBatchInputFormat() {
-    final Schema tableAvroSchema = getTableAvroSchema();
-    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+    final HoodieSchema tableSchema = getTableSchema();
+    //TODO make a convertor

Review Comment:
   Remove this TODO?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String> 
candidateRowKeys) {
   @Override
   public ClosableIterator<HoodieRecord<RowData>> 
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) 
throws IOException {
     //TODO boundary to follow up in later pr

Review Comment:
   Can this TODO be removed now?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String> 
candidateRowKeys) {
   @Override
   public ClosableIterator<HoodieRecord<RowData>> 
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) 
throws IOException {
     //TODO boundary to follow up in later pr
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema.getAvroSchema(), Collections.emptyList());
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema, Collections.emptyList());
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
 
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
-    Schema schema = HoodieAvroUtils.getRecordKeySchema();
+    //TODO add a util for this in HoodieSchemaUtils
+    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieAvroUtils.getRecordKeySchema());
     ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, 
Collections.emptyList());
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
Objects.toString(rowData.getString(0)));
   }
 
   public ClosableIterator<RowData> getRowDataIterator(
       InternalSchemaManager internalSchemaManager,
       DataType dataType,
-      Schema requestedSchema,
+      HoodieSchema requestedSchema,
       List<Predicate> predicates) throws IOException {
     return RecordIterators.getParquetRecordIterator(storage.getConf(), 
internalSchemaManager, dataType, requestedSchema, path, predicates);
   }
 
   @Override
   public HoodieSchema getSchema() {
     if (fileSchema == null) {
-      fileSchema = 
AvroSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
+      //TODO to create a converter for HoodieSchema

Review Comment:
   Remove this TODO?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java:
##########
@@ -41,7 +41,7 @@ public final class JsonDeserializationFunction
   public static JsonDeserializationFunction getInstance(Configuration conf) {
     // Read from file source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).getAvroSchema())

Review Comment:
   Can the new converter be used here?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java:
##########
@@ -510,7 +514,7 @@ abstract static class BaseImageIterator implements 
ClosableIterator<RowData> {
     }
 
     private int[] getRequiredPos(String tableSchema, Schema required) {
-      Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new 
Schema.Parser().parse(tableSchema));
+      Schema dataSchema = 
HoodieAvroUtils.removeMetadataFields(HoodieSchema.parse(tableSchema).getAvroSchema());

Review Comment:
   Can we make this work on HoodieSchema?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -55,13 +56,13 @@ private FormatUtils() {
 
   public static GenericRecord buildAvroRecordBySchema(
       IndexedRecord record,
-      Schema requiredSchema,
+      HoodieSchema requiredSchema,
       int[] requiredPos,
       GenericRecordBuilder recordBuilder) {
-    List<Schema.Field> requiredFields = requiredSchema.getFields();
+    List<HoodieSchemaField> requiredFields = requiredSchema.getFields();
     assert (requiredFields.size() == requiredPos.length);
     Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator();
-    requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, 
positionIterator.next())));
+    requiredFields.forEach(f -> recordBuilder.set(f.name(), getVal(record, 
positionIterator.next())));

Review Comment:
   Can we use the position here instead of the name? Setting with the name 
requires an extra lookup.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String> 
candidateRowKeys) {
   @Override
   public ClosableIterator<HoodieRecord<RowData>> 
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) 
throws IOException {
     //TODO boundary to follow up in later pr
-    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema.getAvroSchema(), Collections.emptyList());
+    ClosableIterator<RowData> rowDataItr = 
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), 
requestedSchema, Collections.emptyList());
     readerIterators.add(rowDataItr);
     return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
   }
 
   @Override
   public ClosableIterator<String> getRecordKeyIterator() throws IOException {
-    Schema schema = HoodieAvroUtils.getRecordKeySchema();
+    //TODO add a util for this in HoodieSchemaUtils

Review Comment:
   Can we address this now? Should be a fairly small change



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java:
##########
@@ -247,18 +247,18 @@ private 
OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> create
 
     // This input format is used to opening the emitted split.
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-    final Schema tableAvroSchema;
+    final HoodieSchema tableSchema;
     try {
-      tableAvroSchema = schemaResolver.getTableAvroSchema();
+      tableSchema = 
HoodieSchema.fromAvroSchema(schemaResolver.getTableAvroSchema());

Review Comment:
   Let's call the new method here as well?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -608,12 +608,12 @@ public static long 
getMaxCompactionMemoryInBytes(Configuration conf) {
     return (long) conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
   }
 
-  public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, 
boolean includeMetadataFields) throws Exception {
+  public static HoodieSchema getTableAvroSchema(HoodieTableMetaClient 
metaClient, boolean includeMetadataFields) throws Exception {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    return schemaUtil.getTableAvroSchema(includeMetadataFields);
+    return 
HoodieSchema.fromAvroSchema(schemaUtil.getTableAvroSchema(includeMetadataFields));

Review Comment:
   let's call the new method here as well to directly return the HoodieSchema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to