the-other-tim-brown commented on code in PR #14355:
URL: https://github.com/apache/hudi/pull/14355#discussion_r2583483368
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -644,10 +647,11 @@ private boolean isPartitioned() {
}
@VisibleForTesting
- public Schema getTableAvroSchema() {
+ public HoodieSchema getTableHoodieSchema() {
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- return schemaResolver.getTableAvroSchema();
+ Schema avroSchema = schemaResolver.getTableAvroSchema();
+ return HoodieSchema.fromAvroSchema(avroSchema);
Review Comment:
Lets update this to call the new method in the table schema resolver
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -435,8 +436,9 @@ private List<MergeOnReadInputSplit> buildInputSplits() {
}
private InputFormat<RowData, ?> getBatchInputFormat() {
- final Schema tableAvroSchema = getTableAvroSchema();
- final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableSchema = getTableSchema();
+ //TODO make a convertor
Review Comment:
Remove this TODO?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
@Override
public ClosableIterator<HoodieRecord<RowData>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
//TODO boundary to follow up in later pr
Review Comment:
Can this TODO be removed now?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
@Override
public ClosableIterator<HoodieRecord<RowData>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
//TODO boundary to follow up in later pr
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema.getAvroSchema(), Collections.emptyList());
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema, Collections.emptyList());
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
- Schema schema = HoodieAvroUtils.getRecordKeySchema();
+ //TODO add a util for this in HoodieSchemaUtils
+ HoodieSchema schema =
HoodieSchema.fromAvroSchema(HoodieAvroUtils.getRecordKeySchema());
ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema,
Collections.emptyList());
return new CloseableMappingIterator<>(rowDataItr, rowData ->
Objects.toString(rowData.getString(0)));
}
public ClosableIterator<RowData> getRowDataIterator(
InternalSchemaManager internalSchemaManager,
DataType dataType,
- Schema requestedSchema,
+ HoodieSchema requestedSchema,
List<Predicate> predicates) throws IOException {
return RecordIterators.getParquetRecordIterator(storage.getConf(),
internalSchemaManager, dataType, requestedSchema, path, predicates);
}
@Override
public HoodieSchema getSchema() {
if (fileSchema == null) {
- fileSchema =
AvroSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
+ //TODO to create a converter for HoodieSchema
Review Comment:
Remove this TODO?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java:
##########
@@ -41,7 +41,7 @@ public final class JsonDeserializationFunction
public static JsonDeserializationFunction getInstance(Configuration conf) {
// Read from file source
RowType rowType =
- (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf).getAvroSchema())
Review Comment:
Can the new converter be used here?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java:
##########
@@ -510,7 +514,7 @@ abstract static class BaseImageIterator implements
ClosableIterator<RowData> {
}
private int[] getRequiredPos(String tableSchema, Schema required) {
- Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new
Schema.Parser().parse(tableSchema));
+ Schema dataSchema =
HoodieAvroUtils.removeMetadataFields(HoodieSchema.parse(tableSchema).getAvroSchema());
Review Comment:
Can we make this work on HoodieSchema?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -55,13 +56,13 @@ private FormatUtils() {
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
- Schema requiredSchema,
+ HoodieSchema requiredSchema,
int[] requiredPos,
GenericRecordBuilder recordBuilder) {
- List<Schema.Field> requiredFields = requiredSchema.getFields();
+ List<HoodieSchemaField> requiredFields = requiredSchema.getFields();
assert (requiredFields.size() == requiredPos.length);
Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator();
- requiredFields.forEach(f -> recordBuilder.set(f, getVal(record,
positionIterator.next())));
+ requiredFields.forEach(f -> recordBuilder.set(f.name(), getVal(record,
positionIterator.next())));
Review Comment:
Can we use the position here instead of the name? Setting with the name
requires an extra lookup.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java:
##########
@@ -85,33 +84,34 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String>
candidateRowKeys) {
@Override
public ClosableIterator<HoodieRecord<RowData>>
getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema)
throws IOException {
//TODO boundary to follow up in later pr
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema.getAvroSchema(), Collections.emptyList());
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema, Collections.emptyList());
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
- Schema schema = HoodieAvroUtils.getRecordKeySchema();
+ //TODO add a util for this in HoodieSchemaUtils
Review Comment:
Can we address this now? Should be a fairly small change
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java:
##########
@@ -247,18 +247,18 @@ private
OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> create
// This input format is used to opening the emitted split.
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- final Schema tableAvroSchema;
+ final HoodieSchema tableSchema;
try {
- tableAvroSchema = schemaResolver.getTableAvroSchema();
+ tableSchema =
HoodieSchema.fromAvroSchema(schemaResolver.getTableAvroSchema());
Review Comment:
Let's call the new method here as well?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -608,12 +608,12 @@ public static long
getMaxCompactionMemoryInBytes(Configuration conf) {
return (long) conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
}
- public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient,
boolean includeMetadataFields) throws Exception {
+ public static HoodieSchema getTableAvroSchema(HoodieTableMetaClient
metaClient, boolean includeMetadataFields) throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- return schemaUtil.getTableAvroSchema(includeMetadataFields);
+ return
HoodieSchema.fromAvroSchema(schemaUtil.getTableAvroSchema(includeMetadataFields));
Review Comment:
let's call the new method here as well to directly return the HoodieSchema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]