meethngala commented on code in PR #3643:
URL: https://github.com/apache/gobblin/pull/3643#discussion_r1113608848
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -61,23 +70,35 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ Map<String, String> catalogProperties = Maps.newHashMap();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+ String specifierName =
properties.getProperty(ICEBERG_SRC_CATALOG_SPECIFIER_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_SPECIFIER_CLASS);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+ Optional<String> srcClusterName =
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME));
+ srcClusterName.ifPresent(s ->
catalogProperties.put(DatasetConstants.PLATFORM_CLUSTER, s));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog,
properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
- return matchingDatasets;
+ try {
+ Class<?> catalogSpecifier = Class.forName(specifierName);
+ IcebergCatalog.CatalogSpecifier specifier =
+ (IcebergCatalog.CatalogSpecifier)
GobblinConstructorUtils.invokeConstructor(catalogSpecifier, specifierName);
+ IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(specifier,
catalogProperties, configuration);
Review Comment:
gotcha... makes sense! I have added the `createIcebergCatalog` method in my
latest commit!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]