This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ea14d68  [HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and 
SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884)
ea14d68 is described below

commit ea14d687da0e20a4673efa1326e5332978305269
Author: Nick Young <[email protected]>
AuthorDate: Sat May 1 10:02:00 2021 +0800

    [HUDI-1852] Add SCHEMA_REGISTRY_SOURCE_URL_SUFFIX and 
SCHEMA_REGISTRY_TARGET_URL_SUFFIX property (#2884)
---
 .../deltastreamer/HoodieMultiTableDeltaStreamer.java    | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 8e557f1..513331d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -153,9 +153,18 @@ public class HoodieMultiTableDeltaStreamer {
   private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, 
TypedProperties typedProperties) {
     if (Objects.equals(cfg.schemaProviderClassName, 
SchemaRegistryProvider.class.getName())) {
       String schemaRegistryBaseUrl = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
-      String schemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
-      typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
schemaRegistrySuffix);
-      typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
schemaRegistrySuffix);
+      String schemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+      String sourceSchemaRegistrySuffix;
+      String targetSchemaRegistrySuffix;
+      if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
+        sourceSchemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
+        targetSchemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
+      } else {
+        targetSchemaRegistrySuffix = schemaRegistrySuffix;
+        sourceSchemaRegistrySuffix = schemaRegistrySuffix;
+      }
+      typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
sourceSchemaRegistrySuffix);
+      typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
targetSchemaRegistrySuffix);
     }
   }
 
@@ -378,6 +387,8 @@ public class HoodieMultiTableDeltaStreamer {
     public static final String HIVE_SYNC_TABLE_PROP = 
"hoodie.datasource.hive_sync.table";
     private static final String SCHEMA_REGISTRY_BASE_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
     private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+    private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
+    private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
     private static final String TABLES_TO_BE_INGESTED_PROP = 
"hoodie.deltastreamer.ingestion.tablesToBeIngested";
     private static final String INGESTION_PREFIX = 
"hoodie.deltastreamer.ingestion.";
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";

Reply via email to