kbendick commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r944967219
##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object
maybeConfigurable, Object conf) {
setConf.invoke(conf);
}
+
+ /**
+ * Migrates tables from one catalog(source catalog) to another
catalog(target catalog).
+ *
+ * <p>Supports bulk migrations with a multi-thread execution. Once the
migration is success, table
+ * would be dropped from the source catalog.
+ *
+ * @param tableIdentifiers a list of {@link TableIdentifier} for the tables
required to be
+ * migrated. If not specified, all the tables would be migrated.
+ * @param sourceCatalog Source {@link Catalog} from which the tables are
chosen
+ * @param targetCatalog Target {@link Catalog} to which the tables need to
be migrated
+ * @param maxConcurrentMigrates Size of the thread pool used for migrate
tables (If set to 0, no
+ * thread pool is used)
+ * @return Collection of table identifiers for successfully migrated tables
+ */
+ public static Collection<TableIdentifier> migrateTables(
+ List<TableIdentifier> tableIdentifiers,
+ Catalog sourceCatalog,
+ Catalog targetCatalog,
+ int maxConcurrentMigrates) {
+ validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+ List<TableIdentifier> identifiers;
+ if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+ // fetch all the table identifiers from all the namespaces.
+ List<Namespace> namespaces =
+ (sourceCatalog instanceof SupportsNamespaces)
+ ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+ : ImmutableList.of(Namespace.empty());
+ identifiers =
+ namespaces.stream()
+ .flatMap(namespace ->
sourceCatalog.listTables(namespace).stream())
+ .collect(Collectors.toList());
+ } else {
+ identifiers = tableIdentifiers;
+ }
+
+ ExecutorService executorService = null;
+ if (maxConcurrentMigrates > 0) {
+ executorService = ThreadPools.newWorkerPool("migrate-tables",
maxConcurrentMigrates);
+ }
Review Comment:
You'll probably want to add the option to allow the user to pass in an
executor service, like is done in several other actions etc.
If this is just a temporary utility -- or I should say isn't intended to be
used from Flink ever (which was the motivator for allowing passing in executor
services) -- then it might not be an issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]