This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 10363c1412 [HUDI-4132] Fixing determining target table schema for 
delta sync with empty batch (#5648)
10363c1412 is described below

commit 10363c1412b8f9b5b16b3e2e075b895e0cc9a293
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 24 08:17:15 2022 -0400

    [HUDI-4132] Fixing determining target table schema for delta sync with 
empty batch (#5648)
---
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java    | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index a1a804b9ed..a4a7e10abc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -840,8 +840,15 @@ public class DeltaSync implements Serializable {
             && 
SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, 
targetSchema).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) {
           // target schema is null. fetch schema from commit metadata and use 
it
           HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
-          TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
-          newWriteSchema = schemaResolver.getTableAvroSchema(false);
+          int totalCompleted = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
+          if (totalCompleted > 0) {
+            try {
+              TableSchemaResolver schemaResolver = new 
TableSchemaResolver(meta);
+              newWriteSchema = schemaResolver.getTableAvroSchema(false);
+            } catch (IllegalArgumentException e) {
+              LOG.warn("Could not fetch schema from table. Falling back to 
using target schema from schema provider");
+            }
+          }
         }
       }
       return newWriteSchema;

Reply via email to