This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ea14d68 [HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and
SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884)
ea14d68 is described below
commit ea14d687da0e20a4673efa1326e5332978305269
Author: Nick Young <[email protected]>
AuthorDate: Sat May 1 10:02:00 2021 +0800
[HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and
SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884)
---
.../deltastreamer/HoodieMultiTableDeltaStreamer.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 8e557f1..513331d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -153,9 +153,18 @@ public class HoodieMultiTableDeltaStreamer {
private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg,
TypedProperties typedProperties) {
if (Objects.equals(cfg.schemaProviderClassName,
SchemaRegistryProvider.class.getName())) {
String schemaRegistryBaseUrl =
typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
- String schemaRegistrySuffix =
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
- typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP,
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) +
schemaRegistrySuffix);
- typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP,
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) +
schemaRegistrySuffix);
+ String schemaRegistrySuffix =
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+ String sourceSchemaRegistrySuffix;
+ String targetSchemaRegistrySuffix;
+ if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
+ sourceSchemaRegistrySuffix =
typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
+ targetSchemaRegistrySuffix =
typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
+ } else {
+ targetSchemaRegistrySuffix = schemaRegistrySuffix;
+ sourceSchemaRegistrySuffix = schemaRegistrySuffix;
+ }
+ typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP,
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) +
sourceSchemaRegistrySuffix);
+ typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP,
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) +
targetSchemaRegistrySuffix);
}
}
@@ -378,6 +387,8 @@ public class HoodieMultiTableDeltaStreamer {
public static final String HIVE_SYNC_TABLE_PROP =
"hoodie.datasource.hive_sync.table";
private static final String SCHEMA_REGISTRY_BASE_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP =
"hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+ private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX =
"hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+ private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX =
"hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
private static final String TABLES_TO_BE_INGESTED_PROP =
"hoodie.deltastreamer.ingestion.tablesToBeIngested";
private static final String INGESTION_PREFIX =
"hoodie.deltastreamer.ingestion.";
private static final String INGESTION_CONFIG_SUFFIX = ".configFile";