[
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=853248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-853248
]
ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Mar/23 20:47
Start Date: 27/Mar/23 20:47
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1149769470
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java:
##########
@@ -39,7 +41,7 @@ protected BaseIcebergCatalog(String catalogName, Class<?
extends Catalog> compan
}
@Override
- public IcebergTable openTable(String dbName, String tableName) {
+ public IcebergTable openTable(String dbName, String tableName) throws
IOException {
Review Comment:
now that we've separated the concerns of opening a table from verifying its
existence (i.e. to support opening a table that doesn't yet exist)... aren't we
able to remove this exception declaration?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java:
##########
@@ -17,15 +17,19 @@
package org.apache.gobblin.data.management.copy.iceberg;
+import java.io.IOException;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
- IcebergTable openTable(String dbName, String tableName);
+ IcebergTable openTable(String dbName, String tableName) throws IOException;
String getCatalogUri();
void initialize(Map<String, String> properties, Configuration configuration);
+ boolean tableAlreadyExists(TableIdentifier tableIdentifier);
Review Comment:
super helpful to be able to ask this Q, but why not as a method (e.g. on
`IcebergTable`)? that way someone holding a table can directly inquire about
what they have.
...BTW I don't see where it's called. did you forget to commit that to this
PR's branch?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -25,34 +25,45 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;
+
/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
-
public static final String ICEBERG_DATASET_PREFIX =
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
- public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX +
".database.name";
- public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX +
".table.name";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
- public static final String ICEBERG_SRC_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+ public static final String ICEBERG_SRC_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX
+ ".source.cluster.name";
+ public static final String ICEBERG_DEST_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".target.catalog.class";
+ public static final String ICEBERG_DEST_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
+ public static final String ICEBERG_DEST_CLUSTER_NAME =
ICEBERG_DATASET_PREFIX + ".target.cluster.name";
Review Comment:
for all these, I believe we decided to use `.destination`, rather than
`.target`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -74,18 +84,15 @@ public List<IcebergDataset> findDatasets() throws
IOException {
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
- try {
- IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName,
icebergCatalog, properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(),
- matchingDatasets, dbName, tblName);
- return matchingDatasets;
- } catch (ReflectiveOperationException exception) {
- throw new IOException(exception);
- }
+ IcebergCatalog sourceIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
+ IcebergCatalog destinationIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+ /* Each Iceberg dataset maps to an Iceberg table
+ * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
+ */
+ matchingDatasets.add(createIcebergDataset(dbName, tblName,
sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
+ log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(),
Review Comment:
ok, sounds reasonable to keep, if we have ambitions to support a list.
NBD, but I would probably add a clarifying comment along the lines,
> // until future support added to specify multiple icebergs, count expected
always to be one
Issue Time Tracking
-------------------
Worklog Id: (was: 853248)
Time Spent: 1h 50m (was: 1h 40m)
> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
> Key: GOBBLIN-1802
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)