nastra commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r945806266
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
+
+ try {
+ Collection<TableIdentifier> migratedTableIdentifiers = new
ConcurrentLinkedQueue<>();
+ Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+ .retry(3)
+ .stopRetryOn(NoSuchTableException.class,
NoSuchNamespaceException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (tableIdentifier, exc) ->
+ LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+ .run(
+ tableIdentifier -> {
+ migrate(sourceCatalog, targetCatalog, tableIdentifier);
+ migratedTableIdentifiers.add(tableIdentifier);
+ });
+ return migratedTableIdentifiers;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private static void validate(
+ Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates)
{
+ Preconditions.checkArgument(
Review Comment:
I think there should be tests for all those validation checks
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
+
+ try {
+ Collection<TableIdentifier> migratedTableIdentifiers = new
ConcurrentLinkedQueue<>();
+ Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+ .retry(3)
+ .stopRetryOn(NoSuchTableException.class,
NoSuchNamespaceException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (tableIdentifier, exc) ->
+ LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+ .run(
+ tableIdentifier -> {
+ migrate(sourceCatalog, targetCatalog, tableIdentifier);
+ migratedTableIdentifiers.add(tableIdentifier);
+ });
+ return migratedTableIdentifiers;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private static void validate(
+ Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates)
{
+ Preconditions.checkArgument(
+ maxConcurrentMigrates >= 0,
+ "maxConcurrentMigrates should have value >= 0, value: " +
maxConcurrentMigrates);
+ Preconditions.checkArgument(sourceCatalog != null, "source catalog should
not be null");
Review Comment:
I think it would be better to update this to `Invalid source catalog: null`
in order to align with other error messages. Same for the target catalog check
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
Review Comment:
I would suggest to move this into its own class rather than adding more
stuff to `CatalogUtil`. The class can then be independently unit-tested.
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
+
+ try {
+ Collection<TableIdentifier> migratedTableIdentifiers = new
ConcurrentLinkedQueue<>();
+ Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+ .retry(3)
+ .stopRetryOn(NoSuchTableException.class,
NoSuchNamespaceException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (tableIdentifier, exc) ->
+ LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+ .run(
+ tableIdentifier -> {
+ migrate(sourceCatalog, targetCatalog, tableIdentifier);
+ migratedTableIdentifiers.add(tableIdentifier);
+ });
+ return migratedTableIdentifiers;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private static void validate(
+ Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates)
{
+ Preconditions.checkArgument(
+ maxConcurrentMigrates >= 0,
+ "maxConcurrentMigrates should have value >= 0, value: " +
maxConcurrentMigrates);
+ Preconditions.checkArgument(sourceCatalog != null, "source catalog should
not be null");
+ Preconditions.checkArgument(targetCatalog != null, "target catalog should
not be null");
+ Preconditions.checkArgument(
+ !targetCatalog.equals(sourceCatalog), "target catalog is same as
source catalog");
+ }
+
+ private static void migrate(
+ Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier
tableIdentifier) {
+ // register the table to the target catalog
+ TableOperations ops =
+ ((HasTableOperations)
sourceCatalog.loadTable(tableIdentifier)).operations();
+ targetCatalog.registerTable(tableIdentifier,
ops.current().metadataFileLocation());
Review Comment:
what would happen if tables would be modified in parallel?
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
+
+ try {
+ Collection<TableIdentifier> migratedTableIdentifiers = new
ConcurrentLinkedQueue<>();
+ Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+ .retry(3)
+ .stopRetryOn(NoSuchTableException.class,
NoSuchNamespaceException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (tableIdentifier, exc) ->
+ LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+ .run(
+ tableIdentifier -> {
+ migrate(sourceCatalog, targetCatalog, tableIdentifier);
+ migratedTableIdentifiers.add(tableIdentifier);
+ });
+ return migratedTableIdentifiers;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private static void validate(
+ Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates)
{
+ Preconditions.checkArgument(
+ maxConcurrentMigrates >= 0,
+ "maxConcurrentMigrates should have value >= 0, value: " +
maxConcurrentMigrates);
+ Preconditions.checkArgument(sourceCatalog != null, "source catalog should
not be null");
+ Preconditions.checkArgument(targetCatalog != null, "target catalog should
not be null");
+ Preconditions.checkArgument(
+ !targetCatalog.equals(sourceCatalog), "target catalog is same as
source catalog");
+ }
+
+ private static void migrate(
+ Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier
tableIdentifier) {
+ // register the table to the target catalog
+ TableOperations ops =
+ ((HasTableOperations)
sourceCatalog.loadTable(tableIdentifier)).operations();
+ targetCatalog.registerTable(tableIdentifier,
ops.current().metadataFileLocation());
+
+ // drop the table from source catalog
+ if (!(sourceCatalog instanceof HadoopCatalog)) {
+ // HadoopCatalog dropTable will delete the table files completely even
when purge is false.
+ // So, skip dropTable for HadoopCatalog.
+ sourceCatalog.dropTable(tableIdentifier, false);
Review Comment:
I'm not sure we'd want to always drop the table from the source catalog by
default after it has been migrated
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
+
+ try {
+ Collection<TableIdentifier> migratedTableIdentifiers = new
ConcurrentLinkedQueue<>();
+ Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+ .retry(3)
+ .stopRetryOn(NoSuchTableException.class,
NoSuchNamespaceException.class)
+ .suppressFailureWhenFinished()
+ .executeWith(executorService)
+ .onFailure(
+ (tableIdentifier, exc) ->
+ LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+ .run(
+ tableIdentifier -> {
+ migrate(sourceCatalog, targetCatalog, tableIdentifier);
+ migratedTableIdentifiers.add(tableIdentifier);
+ });
+ return migratedTableIdentifiers;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private static void validate(
+ Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates)
{
+ Preconditions.checkArgument(
+ maxConcurrentMigrates >= 0,
+ "maxConcurrentMigrates should have value >= 0, value: " +
maxConcurrentMigrates);
Review Comment:
migrates -> migrations maybe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]