meethngala commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1164613288
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
+ Map<String, String> catalogProperties = new HashMap<>();
+ Config config = ConfigBuilder.create().loadProps(properties,
configPrefix).build();
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ catalogProperties.put(entry.getKey(),
entry.getValue().unwrapped().toString());
}
+ String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
catalogProperties.put(CatalogProperties.URI, catalogUri);
- return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ return catalogProperties;
+ }
+
+ private static String getConfigPrefix(CatalogLocation location) {
+ return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() +
".";
Review Comment:
gotcha! Created a method inside enum as suggested. The changes are reflected
in my latest commit.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
+ Map<String, String> catalogProperties = new HashMap<>();
+ Config config = ConfigBuilder.create().loadProps(properties,
configPrefix).build();
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ catalogProperties.put(entry.getKey(),
entry.getValue().unwrapped().toString());
}
+ String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
Review Comment:
yeah... I missed it. Updated it in my latest commit!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]