peterxcli commented on code in PR #10306:
URL: https://github.com/apache/ozone/pull/10306#discussion_r3268776387
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -675,4 +695,188 @@ private static RewriteResult<DeleteFile>
writeDeleteManifest(
throw new RuntimeIOException(e);
}
}
+
+ static class OzonePositionDeleteReaderWriter implements
RewriteTablePathUtil.PositionDeleteReaderWriter {
+ @Override
+ public CloseableIterable<Record> reader(
+ InputFile inputFile, FileFormat format, PartitionSpec spec) {
+ return positionDeletesReader(inputFile, format, spec);
+ }
+
+ @Override
+ public PositionDeleteWriter<Record> writer(
+ OutputFile outputFile,
+ FileFormat format,
+ PartitionSpec spec,
+ StructLike partition,
+ Schema rowSchema)
+ throws IOException {
+ return positionDeletesWriter(outputFile, format, spec, partition,
rowSchema);
+ }
+ }
+
+ private void rewritePositionDeletes(Set<DeleteFile> toRewrite) {
+ /*
+ * NOTE: Rewriting position delete files updates embedded data file paths,
which changes the
+ * resulting file size. This causes a metadata mismatch in the manifests:
+ *
+ * 1. Dependency: Manifests MUST be rewritten first because they are the
source of truth used to identify which
+ * position delete files exist and need processing.
+ * 2. Issue: Because manifests are written before the delete files are
updated, the'file_size_in_bytes' field
+ * in the manifest reflects the original size, not the new size.
+ * 3. Impact: Some catalogs (e.g., REST catalogs like Polaris) will fail
to read these files as the reader uses
+ * the stale size from the manifest.
+ *
+ * This is a known Iceberg limitation being addressed by the Iceberg
community. Once that fix is available
+ * in the Iceberg core, this action should be updated accordingly.
+ */
+ if (toRewrite.isEmpty()) {
+ return;
+ }
+
+ RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter =
new OzonePositionDeleteReaderWriter();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+ ExecutorCompletionService<Void> completionService = new
ExecutorCompletionService<>(executorService);
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (DeleteFile deleteFile : toRewrite) {
+ semaphore.acquire();
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try {
+ rewritePositionDelete(deleteFile, table, sourcePrefix,
targetPrefix, stagingDir, posDeleteReaderWriter);
+ return null;
+ } finally {
+ semaphore.release();
+ }
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
Review Comment:
The `executorService` is already a bounded thread pool with `parallelism`
threads
```suggestion
List<Future<Void>> futures = new ArrayList<>();
for (DeleteFile deleteFile : toRewrite) {
futures.add(executorService.submit(() -> {
rewritePositionDelete(deleteFile, table, sourcePrefix,
targetPrefix, stagingDir, posDeleteReaderWriter);
return null;
}));
}
for (Future<Void> f : futures) {
try {
f.get();
} catch (ExecutionException e) {
executorService.shutdownNow();
throw new RuntimeException(
"Failed to rewrite position delete file", e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executorService.shutdownNow();
throw new RuntimeException(
"Interrupted while rewriting position delete files", e);
}
}
```
##########
hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java:
##########
@@ -675,4 +695,188 @@ private static RewriteResult<DeleteFile>
writeDeleteManifest(
throw new RuntimeIOException(e);
}
}
+
+ static class OzonePositionDeleteReaderWriter implements
RewriteTablePathUtil.PositionDeleteReaderWriter {
+ @Override
+ public CloseableIterable<Record> reader(
+ InputFile inputFile, FileFormat format, PartitionSpec spec) {
+ return positionDeletesReader(inputFile, format, spec);
+ }
+
+ @Override
+ public PositionDeleteWriter<Record> writer(
+ OutputFile outputFile,
+ FileFormat format,
+ PartitionSpec spec,
+ StructLike partition,
+ Schema rowSchema)
+ throws IOException {
+ return positionDeletesWriter(outputFile, format, spec, partition,
rowSchema);
+ }
+ }
+
+ private void rewritePositionDeletes(Set<DeleteFile> toRewrite) {
+ /*
+ * NOTE: Rewriting position delete files updates embedded data file paths,
which changes the
+ * resulting file size. This causes a metadata mismatch in the manifests:
+ *
+ * 1. Dependency: Manifests MUST be rewritten first because they are the
source of truth used to identify which
+ * position delete files exist and need processing.
+ * 2. Issue: Because manifests are written before the delete files are
updated, the'file_size_in_bytes' field
+ * in the manifest reflects the original size, not the new size.
+ * 3. Impact: Some catalogs (e.g., REST catalogs like Polaris) will fail
to read these files as the reader uses
+ * the stale size from the manifest.
+ *
+ * This is a known Iceberg limitation being addressed by the Iceberg
community. Once that fix is available
+ * in the Iceberg core, this action should be updated accordingly.
+ */
+ if (toRewrite.isEmpty()) {
+ return;
+ }
+
+ RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter =
new OzonePositionDeleteReaderWriter();
+ int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
+ Semaphore semaphore = new Semaphore(maxInFlight);
+ ExecutorCompletionService<Void> completionService = new
ExecutorCompletionService<>(executorService);
+ int submittedTasks = 0;
+ int completedTasks = 0;
+
+ try {
+ for (DeleteFile deleteFile : toRewrite) {
+ semaphore.acquire();
+ boolean taskSubmitted = false;
+ try {
+ completionService.submit(() -> {
+ try {
+ rewritePositionDelete(deleteFile, table, sourcePrefix,
targetPrefix, stagingDir, posDeleteReaderWriter);
+ return null;
+ } finally {
+ semaphore.release();
+ }
+ });
+ taskSubmitted = true;
+ submittedTasks++;
+ } finally {
+ if (!taskSubmitted) {
+ semaphore.release();
+ }
+ }
Review Comment:
I think there must be a cleaner way to do this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]