the-other-tim-brown commented on code in PR #14355:
URL: https://github.com/apache/hudi/pull/14355#discussion_r2562726567
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java:
##########
@@ -245,35 +247,44 @@ private void changeTableSchema(TableOptions tableOptions,
boolean shouldCompactB
doCompact(conf);
}
- Schema intType =
SchemaBuilder.unionOf().nullType().and().intType().endUnion();
- Schema longType =
SchemaBuilder.unionOf().nullType().and().longType().endUnion();
- Schema doubleType =
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
- Schema stringType =
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
- Schema structType =
SchemaBuilder.builder().record("new_row_col").fields()
- .name("f0").type(longType).noDefault()
- .name("f1").type(stringType).noDefault().endRecord();
- Schema arrayType =
Schema.createUnion(SchemaBuilder.builder().array().items(stringType),
SchemaBuilder.builder().nullType());
- Schema mapType =
Schema.createUnion(SchemaBuilder.builder().map().values(stringType),
SchemaBuilder.builder().nullType());
-
- writeClient.addColumn("salary", doubleType, null, "name", AFTER);
+ // Create nullable primitive types using HoodieSchema
+ HoodieSchema intType =
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT));
Review Comment:
There is a helper method that allows you to just call
`HoodieSchema.createNullable(HoodieSchemaType.INT)` for easy test setup
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -604,9 +606,10 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
);
}
- private Schema inferSchemaFromDdl() {
- Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType);
- return HoodieAvroUtils.addMetadataFields(schema,
conf.get(FlinkOptions.CHANGELOG_ENABLED));
+ private HoodieSchema inferSchemaFromDdl() {
+ Schema avroSchema = AvroSchemaConverter.convertToSchema(this.tableRowType);
+ HoodieSchema schema = HoodieSchema.fromAvroSchema(avroSchema);
+ return
HoodieSchemaUtils.addMetadataFields(schema,conf.get(FlinkOptions.CHANGELOG_ENABLED));
Review Comment:
nitpick: add space after comma
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java:
##########
@@ -325,8 +326,9 @@ public void
testFilterFileWithInstantRange(WriteOperationType firstCommitOperati
}
}
- private static Schema getRecordAvroSchema(String schemaStr) {
- Schema recordSchema = new Schema.Parser().parse(schemaStr);
- return
AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+ private static HoodieSchema getRecordAvroSchema(String schemaStr) {
Review Comment:
Let's rename this to `getRecordSchema`?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java:
##########
@@ -139,9 +141,10 @@ public static Builder builder() {
private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit
split) {
try {
// get full schema iterator.
- final Schema tableSchema = AvroSchemaCache.intern(new
Schema.Parser().parse(tableState.getAvroSchema()));
+ final HoodieSchema schema = HoodieSchemaCache.intern(
+ new HoodieSchema.Parser().parse(tableState.getAvroSchema()));
Review Comment:
Just an FYI, there is `HoodieSchema.parse` for a less verbose way of doing
this
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
}
@VisibleForTesting
- public Schema getTableAvroSchema() {
+ public HoodieSchema getTableHoodieSchema() {
Review Comment:
Can we just call this `getTableSchema`?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java:
##########
@@ -61,20 +61,20 @@ public static class Config {
.withDocumentation("The schema of the target you are writing to");
}
- private final Schema sourceSchema;
+ private final HoodieSchema sourceSchema;
- private Schema targetSchema;
+ private HoodieSchema targetSchema;
@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
checkRequiredConfigProperties(props,
Collections.singletonList(Config.SOURCE_SCHEMA_FILE));
String sourceSchemaFile = getStringWithAltKeys(props,
Config.SOURCE_SCHEMA_FILE);
FileSystem fs = HadoopFSUtils.getFs(sourceSchemaFile,
HadoopConfigurations.getHadoopConf(new Configuration()));
try {
- this.sourceSchema = new Schema.Parser().parse(fs.open(new
Path(sourceSchemaFile)));
+ this.sourceSchema = new HoodieSchema.Parser().parse(fs.open(new
Path(sourceSchemaFile)));
Review Comment:
I know you did not set this up this way but currently this code is failing
to close the input stream provided to the parsers. Let's make sure we close
these while we're updating this code.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
}
@VisibleForTesting
- public Schema getTableAvroSchema() {
+ public HoodieSchema getTableHoodieSchema() {
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- return schemaResolver.getTableAvroSchema();
+ Schema avroSchema = schemaResolver.getTableAvroSchema();
+ return HoodieSchema.fromAvroSchema(avroSchema);
Review Comment:
I think we'll have a few PRs all doing this same operation. What do you
think about just pushing this down to the TableSchemaResolver so we don't have
to modify as much code later?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]