This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e8747036554 [HUDI-5973] Fixing refreshing of schemas in HoodieStreamer
continuous mode (#10261)
e8747036554 is described below
commit e87470365548ee012c8fb7a9e1f65c062b232dfe
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jan 10 00:02:53 2024 -0500
[HUDI-5973] Fixing refreshing of schemas in HoodieStreamer continuous mode
(#10261)
* Add cachedSchema per batch, fix idempotency with getSourceSchema calls
---------
Co-authored-by: danielfordfc <[email protected]>
---
.../utilities/schema/FilebasedSchemaProvider.java | 29 ++++++++++++-----
.../hudi/utilities/schema/SchemaProvider.java | 5 +++
.../utilities/schema/SchemaRegistryProvider.java | 36 +++++++++++++++++-----
.../apache/hudi/utilities/streamer/StreamSync.java | 5 ++-
.../schema/TestSchemaRegistryProvider.java | 20 ++++++++++++
5 files changed, 79 insertions(+), 16 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 3ca97b01f95..9dbf66325d7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -45,6 +45,11 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private final FileSystem fs;
+ private final String sourceFile;
+ private final String targetFile;
+ private final boolean shouldSanitize;
+ private final String invalidCharMask;
+
protected Schema sourceSchema;
protected Schema targetSchema;
@@ -52,18 +57,21 @@ public class FilebasedSchemaProvider extends SchemaProvider
{
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc)
{
super(props, jssc);
checkRequiredConfigProperties(props,
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
- String sourceFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
- boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
- String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
+ this.sourceFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
+ this.targetFile = getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile);
+ this.shouldSanitize = SanitizationUtils.shouldSanitize(props);
+ this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
- this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs,
shouldSanitize, invalidCharMask);
+ this.sourceSchema = parseSchema(this.sourceFile);
if (containsConfigProperty(props,
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) {
- this.targetSchema = readAvroSchemaFromFile(
- getStringWithAltKeys(props,
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE),
- this.fs, shouldSanitize, invalidCharMask);
+ this.targetSchema = parseSchema(this.targetFile);
}
}
+ private Schema parseSchema(String schemaFile) {
+ return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize,
invalidCharMask);
+ }
+
@Override
public Schema getSourceSchema() {
return sourceSchema;
@@ -87,4 +95,11 @@ public class FilebasedSchemaProvider extends SchemaProvider {
}
return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema,
invalidCharMask);
}
+
+ // Per write batch, refresh the schemas from the file
+ @Override
+ public void refresh() {
+ this.sourceSchema = parseSchema(this.sourceFile);
+ this.targetSchema = parseSchema(this.targetFile);
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
index 2410798d355..5c8ca8f6c1b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
@@ -56,4 +56,9 @@ public abstract class SchemaProvider implements Serializable {
// by default, use source schema as target for hoodie table as well
return getSourceSchema();
}
+
+ //every schema provider has the ability to refresh itself, which will mean
something different per provider.
+ public void refresh() {
+
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 0f65dd338d0..1c2e9181fd7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -82,6 +82,12 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static final String SSL_KEY_PASSWORD_PROP =
"schema.registry.ssl.key.password";
}
+ protected Schema cachedSourceSchema;
+ protected Schema cachedTargetSchema;
+
+ private final String srcSchemaRegistryUrl;
+ private final String targetSchemaRegistryUrl;
+
@FunctionalInterface
public interface SchemaConverter {
/**
@@ -160,6 +166,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
checkRequiredConfigProperties(props,
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+ this.srcSchemaRegistryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
+ this.targetSchemaRegistryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl);
if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
|| config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
setUpSSLStores();
@@ -191,30 +199,42 @@ public class SchemaRegistryProvider extends
SchemaProvider {
@Override
public Schema getSourceSchema() {
- String registryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
try {
- return parseSchemaFromRegistry(registryUrl);
+ if (cachedSourceSchema == null) {
+ cachedSourceSchema =
parseSchemaFromRegistry(this.srcSchemaRegistryUrl);
+ }
+ return cachedSourceSchema;
} catch (Exception e) {
throw new HoodieSchemaFetchException(String.format(
"Error reading source schema from registry. Please check %s is
configured correctly. Truncated URL: %s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
- StringUtils.truncate(registryUrl, 10, 10)), e);
+ StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e);
}
}
@Override
public Schema getTargetSchema() {
- String registryUrl = getStringWithAltKeys(config,
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
- String targetRegistryUrl =
- getStringWithAltKeys(config,
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
try {
- return parseSchemaFromRegistry(targetRegistryUrl);
+ if (cachedTargetSchema == null) {
+ cachedTargetSchema =
parseSchemaFromRegistry(this.targetSchemaRegistryUrl);
+ }
+ return cachedTargetSchema;
} catch (Exception e) {
throw new HoodieSchemaFetchException(String.format(
"Error reading target schema from registry. Please check %s is
configured correctly. If that is not configured then check %s. Truncated URL:
%s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
- StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
+ StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e);
}
}
+
+ // Per SyncOnce call, the cachedschema for the provider is dropped and
SourceSchema re-attained
+ // Subsequent calls to getSourceSchema within the write batch should be
cached.
+ @Override
+ public void refresh() {
+ cachedSourceSchema = null;
+ cachedTargetSchema = null;
+ getSourceSchema();
+ getTargetSchema();
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index ff2debc8dcc..78d3866f679 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -450,7 +450,10 @@ public class StreamSync implements Serializable, Closeable
{
result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics,
overallTimerContext);
}
-
+ // refresh schemas if need be before next batch
+ if (schemaProvider != null) {
+ schemaProvider.refresh();
+ }
metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
return result;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index abbe983cbce..397e72a0ec4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -133,4 +133,24 @@ class TestSchemaRegistryProvider {
.toString();
}
}
+
+ // The SR is checked when cachedSchema is empty, when not empty, the
cachedSchema is used.
+ @Test
+ public void testGetSourceSchemaUsesCachedSchema() throws IOException {
+ TypedProperties props = getProps();
+ SchemaRegistryProvider spyUnderTest = getUnderTest(props);
+
+ // Call when cachedSchema is empty
+ Schema actual = spyUnderTest.getSourceSchema();
+ assertNotNull(actual);
+ verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+
+ assert spyUnderTest.cachedSourceSchema != null;
+
+ Schema actualTwo = spyUnderTest.getSourceSchema();
+
+ // cachedSchema should now be set, a subsequent call should not call
parseSchemaFromRegistry
+ // Assuming this verify() has the scope of the whole test? so it should
still be 1 from previous call?
+ verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
+ }
}