[
https://issues.apache.org/jira/browse/GOBBLIN-1811?focusedWorklogId=856304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-856304
]
ASF GitHub Bot logged work on GOBBLIN-1811:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/23 02:31
Start Date: 12/Apr/23 02:31
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1163489856
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
+ Map<String, String> catalogProperties = new HashMap<>();
+ Config config = ConfigBuilder.create().loadProps(properties,
configPrefix).build();
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ catalogProperties.put(entry.getKey(),
entry.getValue().unwrapped().toString());
}
+ String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
catalogProperties.put(CatalogProperties.URI, catalogUri);
- return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ return catalogProperties;
+ }
+
+ private static String getConfigPrefix(CatalogLocation location) {
+ return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() +
".";
Review Comment:
should be a method on the `CatalogLocation` enum
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem
fs) {
* @param dstMetadata is null if destination {@link IcebergTable} is absent,
in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata
dstMetadata) {
if (dstMetadata != null) {
- this.tableOps.commit(srcMetadata, dstMetadata);
+ // commit (baseMetadata -> destination metadata, updatedMetadata ->
source metadata)
Review Comment:
overly cryptic. maybe say "use current destination metadata as 'base
version' and source as update"?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
+ Map<String, String> catalogProperties = new HashMap<>();
+ Config config = ConfigBuilder.create().loadProps(properties,
configPrefix).build();
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ catalogProperties.put(entry.getKey(),
entry.getValue().unwrapped().toString());
}
+ String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
Review Comment:
shouldn't there be a `"."` between `configPrefix` and the URI KEY?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -49,14 +52,9 @@
@RequiredArgsConstructor
public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
public static final String ICEBERG_DATASET_PREFIX =
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
- public static final String ICEBERG_CLUSTER_KEY = "cluster";
- public static final String ICEBERG_SRC_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
- public static final String ICEBERG_SRC_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
- public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX
+ ".source.cluster.name";
- public static final String ICEBERG_DEST_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
- public static final String ICEBERG_DEST_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
- public static final String ICEBERG_DEST_CLUSTER_NAME =
ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+ public static final String ICEBERG_CATALOG_CLASS = "catalog.class";
+ public static final String ICEBERG_CATALOG_URI_KEY = "catalog.uri";
Review Comment:
please document how these are used... i.e.:
```ICEBERG_CATALOG_PREFIX + "." + ("source" or "destination") + "..."```
be sure to explain how this is an open-ended pattern that allows for passing
arbitrary catalog-specific props through to the catalog.
also, given that configs are essentially a global namespace, I'm not
convinced that the prefix in use is discriminating enough.
whereas at present arbitrary properties go after:
```
iceberg.dataset.source
```
which means
```
iceberg.dataset.destination.database
```
would get passed through--and that seems wrong.
instead, it might be more appropriate to put them after
```
iceberg.dataset.source.catalog
```
(or `iceberg.dataset.catalog.source`)
seems most appropriate, given these are *catalog-specific* props
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor
getDestinationDataset(FileSystem targetFs) {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}
- private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable,
IcebergTable dstIcebergTable) {
- IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+ private PostPublishStep createPostPublishStep(String dbName, String
inputTableName, Properties properties) {
Review Comment:
seems inappropriate for the general case to serialize every single global
property here. can't we constrain only to those actually related to creating
the iceberg catalogs, source and dest?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String
dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- Map<String, String> catalogProperties = new HashMap<>();
+ protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
+ String prefix = getConfigPrefix(location);
+ Map<String, String> catalogProperties = loadCatalogProperties(properties,
prefix);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String catalogUri;
- String icebergCatalogClassName;
- switch (location) {
- case SOURCE:
- catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- case DESTINATION:
- catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
- icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
- break;
- default:
- throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
+ }
+
+ protected static Map<String, String> loadCatalogProperties(Properties
properties, String configPrefix) {
Review Comment:
the semantics here seem non-obvious, hence deserving of javadoc. the name
seems a bit inexact as well. maybe `buildMapFromPrefixChildren`?
Issue Time Tracking
-------------------
Worklog Id: (was: 856304)
Time Spent: 1h (was: 50m)
> Fix Iceberg Registration Serialization
> --------------------------------------
>
> Key: GOBBLIN-1811
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1811
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)