This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 10363c1412 [HUDI-4132] Fixing determining target table schema for
delta sync with empty batch (#5648)
10363c1412 is described below
commit 10363c1412b8f9b5b16b3e2e075b895e0cc9a293
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 24 08:17:15 2022 -0400
[HUDI-4132] Fixing determining target table schema for delta sync with
empty batch (#5648)
---
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a1a804b9ed..a4a7e10abc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -840,8 +840,15 @@ public class DeltaSync implements Serializable {
&&
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA,
targetSchema).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
// target schema is null. fetch schema from commit metadata and use
it
HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(new
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
- TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
- newWriteSchema = schemaResolver.getTableAvroSchema(false);
+ int totalCompleted =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+ if (totalCompleted > 0) {
+ try {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(meta);
+ newWriteSchema = schemaResolver.getTableAvroSchema(false);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not fetch schema from table. Falling back to
using target schema from schema provider");
+ }
+ }
}
}
return newWriteSchema;