dramaticlly commented on code in PR #16730:
URL: https://github.com/apache/iceberg/pull/16730#discussion_r3405697070
##########
core/src/main/java/org/apache/iceberg/ManifestFiles.java:
##########
@@ -602,4 +612,54 @@ static long cacheMaxContentLength(FileIO io) {
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
}
+
+ /**
+ * Writes the given files into manifests in parallel, splitting them into
the given number of
+ * groups and submitting each group to the provided executor.
+ *
+ * @param files content files to write
+ * @param parallelism number of parallel groups; the caller decides this
based on its own
+ * parallelism and minimum-group-size policy
+ * @param writePool executor used to run group writes concurrently
+ * @param writeFunc function that writes a single group and returns the
resulting manifests
+ * @return manifests in input-group order
+ */
+ static <F> List<ManifestFile> writeParallel(
+ Collection<F> files,
+ int parallelism,
+ ExecutorService writePool,
+ Function<List<F>, List<ManifestFile>> writeFunc) {
+ List<List<F>> groups = partition(files, parallelism);
+
+ // Pair each group with its index so results can be reassembled in input
order.
+ List<Pair<Integer, List<F>>> groupsWithIndex = Lists.newArrayList();
+ for (int i = 0; i < groups.size(); i++) {
+ groupsWithIndex.add(Pair.of(i, groups.get(i)));
+ }
+
+ AtomicReferenceArray<List<ManifestFile>> results = new
AtomicReferenceArray<>(groups.size());
+
+ Tasks.foreach(groupsWithIndex)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(writePool)
+ .run(
+ indexedGroup -> {
+ int index = indexedGroup.first();
+ List<F> group = indexedGroup.second();
+ results.set(index, writeFunc.apply(group));
+ });
+
+ ImmutableList.Builder<ManifestFile> builder = ImmutableList.builder();
+ for (int i = 0; i < results.length(); i++) {
+ builder.addAll(results.get(i));
+ }
+ return builder.build();
+ }
+
+ private static <T> List<List<T>> partition(Collection<T> collection, int
groupCount) {
Review Comment:
thanks @wombatu-kun renamed to prior divide as suggested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]