This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1da0b21edd [HUDI-4119] the first read result is incorrect when Flink
upsert- Kafka connector is used in HUDi (#5626)
1da0b21edd is described below
commit 1da0b21edd6693e9025e45889cc7b1c37658a4e1
Author: aliceyyan <[email protected]>
AuthorDate: Fri May 20 18:10:24 2022 +0800
[HUDI-4119] the first read result is incorrect when Flink upsert- Kafka
connector is used in HUDi (#5626)
* HUDI-4119 the first read result is incorrect when Flink upsert- Kafka
connector is used in HUDi
Co-authored-by: aliceyyan <[email protected]>
---
.../java/org/apache/hudi/table/HoodieTableSource.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index bad592aa21..1836857383 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -20,12 +20,16 @@ package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -381,8 +385,8 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> getStreamInputFormat() {
- // if table does not exist, use schema from the DDL
- Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() :
getTableAvroSchema();
+ // if table does not exist or table data does not exist, use schema from
the DDL
+ Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ?
inferSchemaFromDdl() : getTableAvroSchema();
final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType)
getProducedDataType().notNull().getLogicalType();
@@ -399,6 +403,15 @@ public class HoodieTableSource implements
throw new HoodieException(errMsg);
}
+ /**
+ * Returns whether the hoodie table data exists .
+ */
+ private boolean tableDataExists() {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= activeTimeline.getLastCommitMetadataWithValidData();
+ return instantAndCommitMetadata.isPresent();
+ }
+
private MergeOnReadInputFormat mergeOnReadInputFormat(
RowType rowType,
RowType requiredRowType,