This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e8747036554 [HUDI-5973] Fixing refreshing of schemas in HoodieStreamer 
continuous mode (#10261)
e8747036554 is described below

commit e87470365548ee012c8fb7a9e1f65c062b232dfe
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jan 10 00:02:53 2024 -0500

    [HUDI-5973] Fixing refreshing of schemas in HoodieStreamer continuous mode 
(#10261)
    
    * Add cachedSchema per batch, fix idempotency with getSourceSchema calls
    ---------
    Co-authored-by: danielfordfc <[email protected]>
---
 .../utilities/schema/FilebasedSchemaProvider.java  | 29 ++++++++++++-----
 .../hudi/utilities/schema/SchemaProvider.java      |  5 +++
 .../utilities/schema/SchemaRegistryProvider.java   | 36 +++++++++++++++++-----
 .../apache/hudi/utilities/streamer/StreamSync.java |  5 ++-
 .../schema/TestSchemaRegistryProvider.java         | 20 ++++++++++++
 5 files changed, 79 insertions(+), 16 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 3ca97b01f95..9dbf66325d7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -45,6 +45,11 @@ public class FilebasedSchemaProvider extends SchemaProvider {
 
   private final FileSystem fs;
 
+  private final String sourceFile;
+  private final String targetFile;
+  private final boolean shouldSanitize;
+  private final String invalidCharMask;
+
   protected Schema sourceSchema;
 
   protected Schema targetSchema;
@@ -52,18 +57,21 @@ public class FilebasedSchemaProvider extends SchemaProvider 
{
   public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) 
{
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
-    String sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
-    boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
-    String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
+    this.sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
+    this.targetFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile);
+    this.shouldSanitize = SanitizationUtils.shouldSanitize(props);
+    this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
-    this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, 
shouldSanitize, invalidCharMask);
+    this.sourceSchema = parseSchema(this.sourceFile);
     if (containsConfigProperty(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) {
-      this.targetSchema = readAvroSchemaFromFile(
-          getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE),
-          this.fs, shouldSanitize, invalidCharMask);
+      this.targetSchema = parseSchema(this.targetFile);
     }
   }
 
+  private Schema parseSchema(String schemaFile) {
+    return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize, 
invalidCharMask);
+  }
+
   @Override
   public Schema getSourceSchema() {
     return sourceSchema;
@@ -87,4 +95,11 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     }
     return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, 
invalidCharMask);
   }
+
+  // Per write batch, refresh the schemas from the file
+  @Override
+  public void refresh() {
+    this.sourceSchema = parseSchema(this.sourceFile);
+    this.targetSchema = parseSchema(this.targetFile);
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
index 2410798d355..5c8ca8f6c1b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
@@ -56,4 +56,9 @@ public abstract class SchemaProvider implements Serializable {
     // by default, use source schema as target for hoodie table as well
     return getSourceSchema();
   }
+
+  //every schema provider has the ability to refresh itself, which will mean 
something different per provider.
+  public void refresh() {
+
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 0f65dd338d0..1c2e9181fd7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -82,6 +82,12 @@ public class SchemaRegistryProvider extends SchemaProvider {
     public static final String SSL_KEY_PASSWORD_PROP = 
"schema.registry.ssl.key.password";
   }
 
+  protected Schema cachedSourceSchema;
+  protected Schema cachedTargetSchema;
+
+  private final String srcSchemaRegistryUrl;
+  private final String targetSchemaRegistryUrl;
+
   @FunctionalInterface
   public interface SchemaConverter {
     /**
@@ -160,6 +166,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+    this.srcSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
+    this.targetSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl);
     if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
         || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
       setUpSSLStores();
@@ -191,30 +199,42 @@ public class SchemaRegistryProvider extends 
SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
     try {
-      return parseSchemaFromRegistry(registryUrl);
+      if (cachedSourceSchema == null) {
+        cachedSourceSchema = 
parseSchemaFromRegistry(this.srcSchemaRegistryUrl);
+      }
+      return cachedSourceSchema;
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading source schema from registry. Please check %s is 
configured correctly. Truncated URL: %s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(registryUrl, 10, 10)), e);
+          StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e);
     }
   }
 
   @Override
   public Schema getTargetSchema() {
-    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
-    String targetRegistryUrl =
-        getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
     try {
-      return parseSchemaFromRegistry(targetRegistryUrl);
+      if (cachedTargetSchema == null) {
+        cachedTargetSchema = 
parseSchemaFromRegistry(this.targetSchemaRegistryUrl);
+      }
+      return cachedTargetSchema;
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading target schema from registry. Please check %s is 
configured correctly. If that is not configured then check %s. Truncated URL: 
%s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
           Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
+          StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e);
     }
   }
+
+  // Per SyncOnce call, the cachedschema for the provider is dropped and 
SourceSchema re-attained
+  // Subsequent calls to getSourceSchema within the write batch should be 
cached.
+  @Override
+  public void refresh() {
+    cachedSourceSchema = null;
+    cachedTargetSchema = null;
+    getSourceSchema();
+    getTargetSchema();
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index ff2debc8dcc..78d3866f679 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -450,7 +450,10 @@ public class StreamSync implements Serializable, Closeable 
{
 
       result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, 
overallTimerContext);
     }
-
+    // refresh schemas if need be before next batch
+    if (schemaProvider != null) {
+      schemaProvider.refresh();
+    }
     metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
     return result;
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index abbe983cbce..397e72a0ec4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -133,4 +133,24 @@ class TestSchemaRegistryProvider {
           .toString();
     }
   }
+
+  // The SR is checked when cachedSchema is empty, when not empty, the 
cachedSchema is used.
+  @Test
+  public void testGetSourceSchemaUsesCachedSchema() throws IOException {
+    TypedProperties props = getProps();
+    SchemaRegistryProvider spyUnderTest = getUnderTest(props);
+
+    // Call when cachedSchema is empty
+    Schema actual = spyUnderTest.getSourceSchema();
+    assertNotNull(actual);
+    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+
+    assert spyUnderTest.cachedSourceSchema != null;
+
+    Schema actualTwo = spyUnderTest.getSourceSchema();
+    
+    // cachedSchema should now be set, a subsequent call should not call 
parseSchemaFromRegistry
+    // Assuming this verify() has the scope of the whole test? so it should 
still be 1 from previous call?
+    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+  }
 }

Reply via email to