snazy commented on code in PR #5297:
URL: https://github.com/apache/iceberg/pull/5297#discussion_r923514007


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object 
maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Used to migrate tables from one catalog(source catalog) to another 
catalog(target catalog).
+   * Also, the table would be dropped off from the source catalog once the 
migration is successful.
+   *
+   * @param tableIdentifiers        a list of tableIdentifiers for the tables 
required to be migrated,
+   *                                if not specified all the tables would be 
migrated
+   * @param sourceCatalogProperties Source Catalog Properties
+   * @param targetCatalogProperties Target Catalog Properties
+   * @param sourceHadoopConfig      Source Catalog Hadoop Configuration
+   * @param targetHadoopConfig      Target Catalog Hadoop Configuration
+   * @return list of table identifiers for successfully migrated tables
+   */
+  public static List<TableIdentifier> migrateTables(List<TableIdentifier> 
tableIdentifiers,
+      Map<String, String> sourceCatalogProperties, Map<String, String> 
targetCatalogProperties,
+      Object sourceHadoopConfig, Object targetHadoopConfig) {
+    if (tableIdentifiers != null) {
+      tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+          tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+    }
+    Catalog sourceCatalog;
+    try {
+      sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+          sourceCatalogProperties.get("catalogName"), sourceCatalogProperties, 
sourceHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Source Catalog implementation %s: %s", 
sourceCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    Catalog targetCatalog;
+    try {
+      targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+          targetCatalogProperties.get("catalogName"), targetCatalogProperties, 
targetHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Target Catalog implementation %s: %s", 
targetCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    List<TableIdentifier> allIdentifiers = tableIdentifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      List<Namespace> namespaces = (sourceCatalog instanceof 
SupportsNamespaces) ?
+          ((SupportsNamespaces) sourceCatalog).listNamespaces() : 
ImmutableList.of(Namespace.empty());
+      allIdentifiers = namespaces.stream().flatMap(ns ->
+          sourceCatalog.listTables(ns).stream()).collect(Collectors.toList());
+    }
+    List<TableIdentifier> migratedTableIdentifiers = new 
ArrayList<TableIdentifier>();
+    allIdentifiers.forEach(tableIdentifier -> {

Review Comment:
   I suspect this will run for a very long time, like when there a a lot of 
tables.
   If things fail in the meantime, it's hard to resume after the failed table.
   I.e. error handling here is tricky.
   
   Not sure whether it is actually possible to properly handle the case when 
`registerTable` worked, but `dropTable` failed - in such a case you'd have the 
same table in two catalogs.



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object 
maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Used to migrate tables from one catalog(source catalog) to another 
catalog(target catalog).
+   * Also, the table would be dropped off from the source catalog once the 
migration is successful.
+   *
+   * @param tableIdentifiers        a list of tableIdentifiers for the tables 
required to be migrated,
+   *                                if not specified all the tables would be 
migrated
+   * @param sourceCatalogProperties Source Catalog Properties
+   * @param targetCatalogProperties Target Catalog Properties
+   * @param sourceHadoopConfig      Source Catalog Hadoop Configuration
+   * @param targetHadoopConfig      Target Catalog Hadoop Configuration
+   * @return list of table identifiers for successfully migrated tables
+   */
+  public static List<TableIdentifier> migrateTables(List<TableIdentifier> 
tableIdentifiers,
+      Map<String, String> sourceCatalogProperties, Map<String, String> 
targetCatalogProperties,
+      Object sourceHadoopConfig, Object targetHadoopConfig) {
+    if (tableIdentifiers != null) {
+      tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+          tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+    }
+    Catalog sourceCatalog;
+    try {
+      sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+          sourceCatalogProperties.get("catalogName"), sourceCatalogProperties, 
sourceHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Source Catalog implementation %s: %s", 
sourceCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    Catalog targetCatalog;
+    try {
+      targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+          targetCatalogProperties.get("catalogName"), targetCatalogProperties, 
targetHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Target Catalog implementation %s: %s", 
targetCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    List<TableIdentifier> allIdentifiers = tableIdentifiers;

Review Comment:
   I think, this code should probably live in `Catalog`:  A new function like 
`Catalog.registerTableFromCatalog()` to "move" a single table to the current 
catalog. The `HadoopCatalog` could then do the special-handling in its 
implementation.



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
     verifyCommitMetadata();
   }
 
+  private void validateRegister(TableIdentifier identifier, String 
metadataVersionFiles) {
+    Assertions.assertThat(catalog.registerTable(identifier, "file:" + 
metadataVersionFiles)).isNotNull();
+    Table newTable = catalog.loadTable(identifier);
+    Assertions.assertThat(newTable).isNotNull();
+    TableOperations ops = ((HasTableOperations) newTable).operations();
+    String metadataLocation = ((NessieTableOperations) 
ops).currentMetadataLocation();
+    Assertions.assertThat("file:" + 
metadataVersionFiles).isEqualTo(metadataLocation);
+    Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
+  }
+
+  @Test
+  public void testRegisterTableWithGivenBranch() {
+    List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+    Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());

Review Comment:
   Hint: `assertThat(metadataVersionFiles).hasSize(1)`



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
     verifyCommitMetadata();
   }
 
+  private void validateRegister(TableIdentifier identifier, String 
metadataVersionFiles) {
+    Assertions.assertThat(catalog.registerTable(identifier, "file:" + 
metadataVersionFiles)).isNotNull();
+    Table newTable = catalog.loadTable(identifier);
+    Assertions.assertThat(newTable).isNotNull();
+    TableOperations ops = ((HasTableOperations) newTable).operations();
+    String metadataLocation = ((NessieTableOperations) 
ops).currentMetadataLocation();
+    Assertions.assertThat("file:" + 
metadataVersionFiles).isEqualTo(metadataLocation);
+    Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
+  }
+
+  @Test
+  public void testRegisterTableWithGivenBranch() {
+    List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+    Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
+    ImmutableTableReference tableReference =
+        
ImmutableTableReference.builder().reference("main").name(TABLE_NAME).build();

Review Comment:
   Please use a different branch here.
   Using the default branch is not that great - and the test says `...Branch` 
implying it's not the default branch.



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java:
##########
@@ -152,7 +152,8 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
       }
     }
 
-    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 
1);
+    String newMetadataLocation = (base == null) && 
(metadata.metadataFileLocation() != null) ?

Review Comment:
   Why is this necessary?
   It's a new commit, not sure whether it is good that it re-uses an existing 
metadata location that is (potentially) "owned" by another catalog.



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
     verifyCommitMetadata();
   }
 
+  private void validateRegister(TableIdentifier identifier, String 
metadataVersionFiles) {
+    Assertions.assertThat(catalog.registerTable(identifier, "file:" + 
metadataVersionFiles)).isNotNull();
+    Table newTable = catalog.loadTable(identifier);
+    Assertions.assertThat(newTable).isNotNull();
+    TableOperations ops = ((HasTableOperations) newTable).operations();
+    String metadataLocation = ((NessieTableOperations) 
ops).currentMetadataLocation();
+    Assertions.assertThat("file:" + 
metadataVersionFiles).isEqualTo(metadataLocation);

Review Comment:
   Hint: Your assertions are often the "wrong" way around.
   It's always `assertThat(<current state>)....`, followed by the expectations.



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object 
maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Used to migrate tables from one catalog(source catalog) to another 
catalog(target catalog).
+   * Also, the table would be dropped off from the source catalog once the 
migration is successful.
+   *
+   * @param tableIdentifiers        a list of tableIdentifiers for the tables 
required to be migrated,
+   *                                if not specified all the tables would be 
migrated
+   * @param sourceCatalogProperties Source Catalog Properties
+   * @param targetCatalogProperties Target Catalog Properties
+   * @param sourceHadoopConfig      Source Catalog Hadoop Configuration
+   * @param targetHadoopConfig      Target Catalog Hadoop Configuration
+   * @return list of table identifiers for successfully migrated tables
+   */
+  public static List<TableIdentifier> migrateTables(List<TableIdentifier> 
tableIdentifiers,
+      Map<String, String> sourceCatalogProperties, Map<String, String> 
targetCatalogProperties,
+      Object sourceHadoopConfig, Object targetHadoopConfig) {
+    if (tableIdentifiers != null) {
+      tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+          tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+    }
+    Catalog sourceCatalog;
+    try {
+      sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+          sourceCatalogProperties.get("catalogName"), sourceCatalogProperties, 
sourceHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Source Catalog implementation %s: %s", 
sourceCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    Catalog targetCatalog;
+    try {
+      targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+          targetCatalogProperties.get("catalogName"), targetCatalogProperties, 
targetHadoopConfig);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Target Catalog implementation %s: %s", 
targetCatalogProperties.get("catalogImpl"),
+          e.getMessage()), e);
+    }
+    List<TableIdentifier> allIdentifiers = tableIdentifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      List<Namespace> namespaces = (sourceCatalog instanceof 
SupportsNamespaces) ?
+          ((SupportsNamespaces) sourceCatalog).listNamespaces() : 
ImmutableList.of(Namespace.empty());
+      allIdentifiers = namespaces.stream().flatMap(ns ->

Review Comment:
   I suspect this will run for a very long time, like when there a a lot of 
tables.



##########
aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java:
##########
@@ -295,6 +298,37 @@ public void testDropNamespace() {
     Assert.assertFalse("namespace must not exist", response.hasItem());
   }
 
+  @Test
+  public void testRegisterTable() {

Review Comment:
   This pair of tests is repeated (in a very similar way) across multiple 
catalogs. Can those be centralized somewhere? `CatalogTests` maybe?



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object 
maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Used to migrate tables from one catalog(source catalog) to another 
catalog(target catalog).
+   * Also, the table would be dropped off from the source catalog once the 
migration is successful.
+   *
+   * @param tableIdentifiers        a list of tableIdentifiers for the tables 
required to be migrated,
+   *                                if not specified all the tables would be 
migrated
+   * @param sourceCatalogProperties Source Catalog Properties
+   * @param targetCatalogProperties Target Catalog Properties
+   * @param sourceHadoopConfig      Source Catalog Hadoop Configuration
+   * @param targetHadoopConfig      Target Catalog Hadoop Configuration
+   * @return list of table identifiers for successfully migrated tables
+   */
+  public static List<TableIdentifier> migrateTables(List<TableIdentifier> 
tableIdentifiers,
+      Map<String, String> sourceCatalogProperties, Map<String, String> 
targetCatalogProperties,
+      Object sourceHadoopConfig, Object targetHadoopConfig) {
+    if (tableIdentifiers != null) {

Review Comment:
   Let's leave out the catalog instantiation and configuration here completely. 
I suspect that users have at least one of these catalogs already handy - and 
setting up "the same" catalog twice is superfluous.



##########
core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java:
##########
@@ -639,4 +642,29 @@ public void testConversions() {
     Assert.assertEquals(ns, JdbcUtil.stringToNamespace(nsString));
   }
 
+  @Test

Review Comment:
   Should these tests better live in `CatalogTests`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to