meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1154736042
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator()
throws IOException {
return findDatasets().iterator();
}
- protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
- IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
- return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+ /**
+ * Requires both source and destination catalogs to connect to their
respective {@link IcebergTable}
+ * Note: the destination side {@link IcebergTable} should be present before
initiating replication
+ * @return {@link IcebergDataset} with its corresponding source and
destination {@link IcebergTable}
+ */
+ protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog,
Properties properties, FileSystem fs) throws IOException {
+ IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName,
tblName);
+
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+ IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(dbName, tblName);
+
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName,
tblName));
+ return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties) throws
IOException, ClassNotFoundException {
+ protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
- String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is
required");
- catalogProperties.put(CatalogProperties.URI, catalogUri);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ String catalogUri;
+ String icebergCatalogClassName;
+ switch (location) {
+ case SOURCE:
+ catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service
URI is required");
+ // introducing an optional property for catalogs requiring cluster
specific properties
+
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ break;
+ case DESTINATION:
+ catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Destination Catalog Table
Service URI is required");
Review Comment:
I have added the input values that the user would provide in my latest
commit. It would be null, since we are checking for null values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]