snazy commented on code in PR #5297:
URL: https://github.com/apache/iceberg/pull/5297#discussion_r923514007
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Used to migrate tables from one catalog(source catalog) to another
catalog(target catalog).
+ * Also, the table would be dropped off from the source catalog once the
migration is successful.
+ *
+ * @param tableIdentifiers a list of tableIdentifiers for the tables
required to be migrated,
+ * if not specified all the tables would be
migrated
+ * @param sourceCatalogProperties Source Catalog Properties
+ * @param targetCatalogProperties Target Catalog Properties
+ * @param sourceHadoopConfig Source Catalog Hadoop Configuration
+ * @param targetHadoopConfig Target Catalog Hadoop Configuration
+ * @return list of table identifiers for successfully migrated tables
+ */
+ public static List<TableIdentifier> migrateTables(List<TableIdentifier>
tableIdentifiers,
+ Map<String, String> sourceCatalogProperties, Map<String, String>
targetCatalogProperties,
+ Object sourceHadoopConfig, Object targetHadoopConfig) {
+ if (tableIdentifiers != null) {
+ tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+ tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+ }
+ Catalog sourceCatalog;
+ try {
+ sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+ sourceCatalogProperties.get("catalogName"), sourceCatalogProperties,
sourceHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Source Catalog implementation %s: %s",
sourceCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ Catalog targetCatalog;
+ try {
+ targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+ targetCatalogProperties.get("catalogName"), targetCatalogProperties,
targetHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Target Catalog implementation %s: %s",
targetCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ List<TableIdentifier> allIdentifiers = tableIdentifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ List<Namespace> namespaces = (sourceCatalog instanceof
SupportsNamespaces) ?
+ ((SupportsNamespaces) sourceCatalog).listNamespaces() :
ImmutableList.of(Namespace.empty());
+ allIdentifiers = namespaces.stream().flatMap(ns ->
+ sourceCatalog.listTables(ns).stream()).collect(Collectors.toList());
+ }
+ List<TableIdentifier> migratedTableIdentifiers = new
ArrayList<TableIdentifier>();
+ allIdentifiers.forEach(tableIdentifier -> {
Review Comment:
I suspect this will run for a very long time, like when there a a lot of
tables.
If things fail in the meantime, it's hard to resume after the failed table.
I.e. error handling here is tricky.
Not sure whether it is actually possible to properly handle the case when
`registerTable` worked, but `dropTable` failed - in such a case you'd have the
same table in two catalogs.
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Used to migrate tables from one catalog(source catalog) to another
catalog(target catalog).
+ * Also, the table would be dropped off from the source catalog once the
migration is successful.
+ *
+ * @param tableIdentifiers a list of tableIdentifiers for the tables
required to be migrated,
+ * if not specified all the tables would be
migrated
+ * @param sourceCatalogProperties Source Catalog Properties
+ * @param targetCatalogProperties Target Catalog Properties
+ * @param sourceHadoopConfig Source Catalog Hadoop Configuration
+ * @param targetHadoopConfig Target Catalog Hadoop Configuration
+ * @return list of table identifiers for successfully migrated tables
+ */
+ public static List<TableIdentifier> migrateTables(List<TableIdentifier>
tableIdentifiers,
+ Map<String, String> sourceCatalogProperties, Map<String, String>
targetCatalogProperties,
+ Object sourceHadoopConfig, Object targetHadoopConfig) {
+ if (tableIdentifiers != null) {
+ tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+ tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+ }
+ Catalog sourceCatalog;
+ try {
+ sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+ sourceCatalogProperties.get("catalogName"), sourceCatalogProperties,
sourceHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Source Catalog implementation %s: %s",
sourceCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ Catalog targetCatalog;
+ try {
+ targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+ targetCatalogProperties.get("catalogName"), targetCatalogProperties,
targetHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Target Catalog implementation %s: %s",
targetCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ List<TableIdentifier> allIdentifiers = tableIdentifiers;
Review Comment:
I think, this code should probably live in `Catalog`: A new function like
`Catalog.registerTableFromCatalog()` to "move" a single table to the current
catalog. The `HadoopCatalog` could then do the special-handling in its
implementation.
##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
verifyCommitMetadata();
}
+ private void validateRegister(TableIdentifier identifier, String
metadataVersionFiles) {
+ Assertions.assertThat(catalog.registerTable(identifier, "file:" +
metadataVersionFiles)).isNotNull();
+ Table newTable = catalog.loadTable(identifier);
+ Assertions.assertThat(newTable).isNotNull();
+ TableOperations ops = ((HasTableOperations) newTable).operations();
+ String metadataLocation = ((NessieTableOperations)
ops).currentMetadataLocation();
+ Assertions.assertThat("file:" +
metadataVersionFiles).isEqualTo(metadataLocation);
+ Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
+ }
+
+ @Test
+ public void testRegisterTableWithGivenBranch() {
+ List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+ Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
Review Comment:
Hint: `assertThat(metadataVersionFiles).hasSize(1)`
##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
verifyCommitMetadata();
}
+ private void validateRegister(TableIdentifier identifier, String
metadataVersionFiles) {
+ Assertions.assertThat(catalog.registerTable(identifier, "file:" +
metadataVersionFiles)).isNotNull();
+ Table newTable = catalog.loadTable(identifier);
+ Assertions.assertThat(newTable).isNotNull();
+ TableOperations ops = ((HasTableOperations) newTable).operations();
+ String metadataLocation = ((NessieTableOperations)
ops).currentMetadataLocation();
+ Assertions.assertThat("file:" +
metadataVersionFiles).isEqualTo(metadataLocation);
+ Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
+ }
+
+ @Test
+ public void testRegisterTableWithGivenBranch() {
+ List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
+ Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
+ ImmutableTableReference tableReference =
+
ImmutableTableReference.builder().reference("main").name(TABLE_NAME).build();
Review Comment:
Please use a different branch here.
Using the default branch is not that great - and the test says `...Branch`
implying it's not the default branch.
##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java:
##########
@@ -152,7 +152,8 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
}
}
- String newMetadataLocation = writeNewMetadata(metadata, currentVersion() +
1);
+ String newMetadataLocation = (base == null) &&
(metadata.metadataFileLocation() != null) ?
Review Comment:
Why is this necessary?
It's a new commit, not sure whether it is good that it re-uses an existing
metadata location that is (potentially) "owned" by another catalog.
##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java:
##########
@@ -385,6 +388,89 @@ public void testDropTable() throws IOException {
verifyCommitMetadata();
}
+ private void validateRegister(TableIdentifier identifier, String
metadataVersionFiles) {
+ Assertions.assertThat(catalog.registerTable(identifier, "file:" +
metadataVersionFiles)).isNotNull();
+ Table newTable = catalog.loadTable(identifier);
+ Assertions.assertThat(newTable).isNotNull();
+ TableOperations ops = ((HasTableOperations) newTable).operations();
+ String metadataLocation = ((NessieTableOperations)
ops).currentMetadataLocation();
+ Assertions.assertThat("file:" +
metadataVersionFiles).isEqualTo(metadataLocation);
Review Comment:
Hint: Your assertions are often the "wrong" way around.
It's always `assertThat(<current state>)....`, followed by the expectations.
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Used to migrate tables from one catalog(source catalog) to another
catalog(target catalog).
+ * Also, the table would be dropped off from the source catalog once the
migration is successful.
+ *
+ * @param tableIdentifiers a list of tableIdentifiers for the tables
required to be migrated,
+ * if not specified all the tables would be
migrated
+ * @param sourceCatalogProperties Source Catalog Properties
+ * @param targetCatalogProperties Target Catalog Properties
+ * @param sourceHadoopConfig Source Catalog Hadoop Configuration
+ * @param targetHadoopConfig Target Catalog Hadoop Configuration
+ * @return list of table identifiers for successfully migrated tables
+ */
+ public static List<TableIdentifier> migrateTables(List<TableIdentifier>
tableIdentifiers,
+ Map<String, String> sourceCatalogProperties, Map<String, String>
targetCatalogProperties,
+ Object sourceHadoopConfig, Object targetHadoopConfig) {
+ if (tableIdentifiers != null) {
+ tableIdentifiers.forEach(tableIdentifier -> Preconditions.checkArgument(
+ tableIdentifier != null, "Invalid identifier: %s", tableIdentifier));
+ }
+ Catalog sourceCatalog;
+ try {
+ sourceCatalog = loadCatalog(sourceCatalogProperties.get("catalogImpl"),
+ sourceCatalogProperties.get("catalogName"), sourceCatalogProperties,
sourceHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Source Catalog implementation %s: %s",
sourceCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ Catalog targetCatalog;
+ try {
+ targetCatalog = loadCatalog(targetCatalogProperties.get("catalogImpl"),
+ targetCatalogProperties.get("catalogName"), targetCatalogProperties,
targetHadoopConfig);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Target Catalog implementation %s: %s",
targetCatalogProperties.get("catalogImpl"),
+ e.getMessage()), e);
+ }
+ List<TableIdentifier> allIdentifiers = tableIdentifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ List<Namespace> namespaces = (sourceCatalog instanceof
SupportsNamespaces) ?
+ ((SupportsNamespaces) sourceCatalog).listNamespaces() :
ImmutableList.of(Namespace.empty());
+ allIdentifiers = namespaces.stream().flatMap(ns ->
Review Comment:
I suspect this will run for a very long time, like when there a a lot of
tables.
##########
aws/src/integration/java/org/apache/iceberg/aws/dynamodb/TestDynamoDbCatalog.java:
##########
@@ -295,6 +298,37 @@ public void testDropNamespace() {
Assert.assertFalse("namespace must not exist", response.hasItem());
}
+ @Test
+ public void testRegisterTable() {
Review Comment:
This pair of tests is repeated (in a very similar way) across multiple
catalogs. Can those be centralized somewhere? `CatalogTests` maybe?
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -346,4 +353,63 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Used to migrate tables from one catalog(source catalog) to another
catalog(target catalog).
+ * Also, the table would be dropped off from the source catalog once the
migration is successful.
+ *
+ * @param tableIdentifiers a list of tableIdentifiers for the tables
required to be migrated,
+ * if not specified all the tables would be
migrated
+ * @param sourceCatalogProperties Source Catalog Properties
+ * @param targetCatalogProperties Target Catalog Properties
+ * @param sourceHadoopConfig Source Catalog Hadoop Configuration
+ * @param targetHadoopConfig Target Catalog Hadoop Configuration
+ * @return list of table identifiers for successfully migrated tables
+ */
+ public static List<TableIdentifier> migrateTables(List<TableIdentifier>
tableIdentifiers,
+ Map<String, String> sourceCatalogProperties, Map<String, String>
targetCatalogProperties,
+ Object sourceHadoopConfig, Object targetHadoopConfig) {
+ if (tableIdentifiers != null) {
Review Comment:
Let's leave out the catalog instantiation and configuration here completely.
I suspect that users have at least one of these catalogs already handy - and
setting up "the same" catalog twice is superfluous.
##########
core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java:
##########
@@ -639,4 +642,29 @@ public void testConversions() {
Assert.assertEquals(ns, JdbcUtil.stringToNamespace(nsString));
}
+ @Test
Review Comment:
Should these tests better live in `CatalogTests`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]