the-other-tim-brown commented on code in PR #14355:
URL: https://github.com/apache/hudi/pull/14355#discussion_r2562726567


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java:
##########
@@ -245,35 +247,44 @@ private void changeTableSchema(TableOptions tableOptions, 
boolean shouldCompactB
         doCompact(conf);
       }
 
-      Schema intType = 
SchemaBuilder.unionOf().nullType().and().intType().endUnion();
-      Schema longType = 
SchemaBuilder.unionOf().nullType().and().longType().endUnion();
-      Schema doubleType = 
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
-      Schema stringType = 
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
-      Schema structType = 
SchemaBuilder.builder().record("new_row_col").fields()
-              .name("f0").type(longType).noDefault()
-              .name("f1").type(stringType).noDefault().endRecord();
-      Schema arrayType = 
Schema.createUnion(SchemaBuilder.builder().array().items(stringType), 
SchemaBuilder.builder().nullType());
-      Schema mapType = 
Schema.createUnion(SchemaBuilder.builder().map().values(stringType), 
SchemaBuilder.builder().nullType());
-
-      writeClient.addColumn("salary", doubleType, null, "name", AFTER);
+      // Create nullable primitive types using HoodieSchema
+      HoodieSchema intType = 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT));

Review Comment:
   There is a helper method that allows you to just call 
`HoodieSchema.createNullable(HoodieSchemaType.INT)` for easy test setup



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -604,9 +606,10 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
     );
   }
 
-  private Schema inferSchemaFromDdl() {
-    Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType);
-    return HoodieAvroUtils.addMetadataFields(schema, 
conf.get(FlinkOptions.CHANGELOG_ENABLED));
+  private HoodieSchema inferSchemaFromDdl() {
+    Schema avroSchema = AvroSchemaConverter.convertToSchema(this.tableRowType);
+    HoodieSchema schema = HoodieSchema.fromAvroSchema(avroSchema);
+    return 
HoodieSchemaUtils.addMetadataFields(schema,conf.get(FlinkOptions.CHANGELOG_ENABLED));

Review Comment:
   nitpick: add space after comma



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -325,8 +326,9 @@ public void 
testFilterFileWithInstantRange(WriteOperationType firstCommitOperati
     }
   }
 
-  private static Schema getRecordAvroSchema(String schemaStr) {
-    Schema recordSchema = new Schema.Parser().parse(schemaStr);
-    return 
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+  private static HoodieSchema getRecordAvroSchema(String schemaStr) {

Review Comment:
   Let's rename this to `getRecordSchema`?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java:
##########
@@ -139,9 +141,10 @@ public static Builder builder() {
   private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
     try {
       // get full schema iterator.
-      final Schema tableSchema = AvroSchemaCache.intern(new 
Schema.Parser().parse(tableState.getAvroSchema()));
+      final HoodieSchema schema = HoodieSchemaCache.intern(
+          new HoodieSchema.Parser().parse(tableState.getAvroSchema()));

Review Comment:
   Just an FYI, there is `HoodieSchema.parse` for a less verbose way of doing 
this



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
   }
 
   @VisibleForTesting
-  public Schema getTableAvroSchema() {
+  public HoodieSchema getTableHoodieSchema() {

Review Comment:
   Can we just call this `getTableSchema`?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java:
##########
@@ -61,20 +61,20 @@ public static class Config {
         .withDocumentation("The schema of the target you are writing to");
   }
 
-  private final Schema sourceSchema;
+  private final HoodieSchema sourceSchema;
 
-  private Schema targetSchema;
+  private HoodieSchema targetSchema;
 
   @Deprecated
   public FilebasedSchemaProvider(TypedProperties props) {
     checkRequiredConfigProperties(props, 
Collections.singletonList(Config.SOURCE_SCHEMA_FILE));
     String sourceSchemaFile = getStringWithAltKeys(props, 
Config.SOURCE_SCHEMA_FILE);
     FileSystem fs = HadoopFSUtils.getFs(sourceSchemaFile, 
HadoopConfigurations.getHadoopConf(new Configuration()));
     try {
-      this.sourceSchema = new Schema.Parser().parse(fs.open(new 
Path(sourceSchemaFile)));
+      this.sourceSchema = new HoodieSchema.Parser().parse(fs.open(new 
Path(sourceSchemaFile)));

Review Comment:
   I know you did not set this up this way but currently this code is failing 
to close the input stream provided to the parsers. Let's make sure we close 
these while we're updating this code.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
   }
 
   @VisibleForTesting
-  public Schema getTableAvroSchema() {
+  public HoodieSchema getTableHoodieSchema() {
     try {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-      return schemaResolver.getTableAvroSchema();
+      Schema avroSchema = schemaResolver.getTableAvroSchema();
+      return HoodieSchema.fromAvroSchema(avroSchema);

Review Comment:
   I think we'll have a few PRs all doing this same operation. What do you 
think about just pushing this down to the TableSchemaResolver so we don't have 
to modify as much code later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to