n3nash commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r629687494



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -429,8 +432,55 @@ public static SchemaProvider 
createRowBasedSchemaProvider(StructType structType,
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 
+  /**
+   * Create latest schema provider for Target schema.
+   * @param structType spark data type of incoming batch.
+   * @param jssc instance of {@link JavaSparkContext}.
+   * @param fs instance of {@link FileSystem}.
+   * @param basePath base path of the table.
+   * @return the schema provider where target schema refers to latest 
schema(either incoming schema or table schema).
+   */
+  public static SchemaProvider createLatestSchemaProvider(StructType 
structType,
+      JavaSparkContext jssc, FileSystem fs, String basePath) {
+    SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
+    Schema incomingSchema = rowSchemaProvider.getTargetSchema();
+    Schema latestSchema = incomingSchema;
+
+    try {
+      if (fs.exists(new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient tableMetaClient = 
HoodieTableMetaClient.builder().setConf(jssc.sc().hadoopConfiguration()).setBasePath(basePath).build();
+        TableSchemaResolver
+            tableSchemaResolver = new TableSchemaResolver(tableMetaClient);
+        latestSchema = tableSchemaResolver.getLatestSchema(incomingSchema, 
true, new Function1<Schema, Schema>() {
+          @Override
+          public Schema apply(Schema v1) throws IOException {
+            return AvroConversionUtils.convertStructTypeToAvroSchema(
+                AvroConversionUtils.convertAvroSchemaToStructType(v1), 
RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+                RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE);
+          }
+        });
+      }
+    } catch (IOException e) {
+      LOG.debug("Swallowing exception while trying to fetch table's latest 
schema. Falling back to incoming schema");
+    }
+
+    final Schema finalLatestSchema = latestSchema;
+    return new SchemaProvider(null) {
+      @Override
+      public Schema getSourceSchema() {
+        return rowSchemaProvider.getSourceSchema();
+      }
+
+      @Override
+      public Schema getTargetSchema() {
+        return finalLatestSchema;
+      }
+    };
+  }
+
   @FunctionalInterface
   public interface CheckedSupplier<T> {
+

Review comment:
       remove this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to