meethngala commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1164613567
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
Review Comment:
gotcha... renamed the method and added the javadoc for it as well :)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor
getDestinationDataset(FileSystem targetFs) {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}
- private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable,
IcebergTable dstIcebergTable) {
- IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+ private PostPublishStep createPostPublishStep(String dbName, String
inputTableName, Properties properties) {
Review Comment:
I understand the concern here as we shouldn't need to serialize all the
properties. I explored the option of doing it, but I am skeptical because we
use those `properties` for providing configuration for Hadoop too which is
utilized by `CatalogUtil.loadCatalog()`. And we pass the properties as-is to
the configuration and it is set for an instance of Configurable:
`(Configurable)catalog).setConf(hadoopConf)`. Now, it would be exhaustive to
search how these hadoop configs are used by different catalog-impl. Does that
make sense?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem
fs) {
* @param dstMetadata is null if destination {@link IcebergTable} is absent,
in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata
dstMetadata) {
if (dstMetadata != null) {
- this.tableOps.commit(srcMetadata, dstMetadata);
+ // commit (baseMetadata -> destination metadata, updatedMetadata ->
source metadata)
Review Comment:
gotcha. I have simplified it in my latest commit!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]