leeyam24 opened a new issue, #16752:
URL: https://github.com/apache/iceberg/issues/16752
### Feature Request / Improvement
`rewrite_table_path` rewrites manifest list files and version files
sequentially by default. The Java API exposes `executeWith(ExecutorService)` to
parallelize these stages, but this method is inaccessible from PySpark or SQL
because:
1. The `RewriteTablePathProcedure` has no parameter for thread count — it
only accepts `table`, `source_prefix`, `target_prefix`, `start_version`,
`end_version`, `staging_location`, and `create_file_list`.
2. PySpark has no mechanism to construct and pass a Java `ExecutorService`
object to a Spark procedure.
As a result, users running `rewrite_table_path` from PySpark on tables with
large snapshot histories are forced into sequential manifest list rewriting
with no way to tune parallelism.
**Current Behavior**
In `RewriteTablePathSparkAction.rebuildMetadata()`, the manifest list
rewriting loop is:
```java
Tasks.foreach(validSnapshots)
.noRetry()
.throwFailureWhenFinished()
.executeWith(executorService) // null when no ExecutorService was
provided
.run(snapshot -> manifestListResults.add(
rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)));
```
When `executorService` is `null`, `Tasks.foreach` falls back to sequential
single-threaded execution. For a table with hundreds or thousands of snapshots,
this is a significant bottleneck.
The `executeWith(ExecutorService)` method on the `RewriteTablePath`
interface documents this explicitly:
> *"If this method is not called, these operations will be performed
sequentially."*
However, the `RewriteTablePathProcedure` never calls `executeWith`, and
there is no procedure parameter to configure it.
**Expected Behavior**
PySpark users should be able to control the degree of parallelism for
manifest list and version file rewriting via a procedure option, consistent
with how other Iceberg Spark procedures expose tuning knobs (e.g.,
`rewrite_data_files` accepts an `options => map(...)` parameter).
**Proposed Solution**
Add a `thread-pool-size` option to `RewriteTablePath` (as a string constant
with a documented default, e.g. `4`) and wire it through
`RewriteTablePathSparkAction` and `RewriteTablePathProcedure`.
**In `RewriteTablePath` (the interface):**
```java
String THREAD_POOL_SIZE = "thread-pool-size";
int THREAD_POOL_SIZE_DEFAULT = 4;
```
**In `RewriteTablePathSparkAction.rebuildMetadata()`:**
```java
int poolSize = PropertyUtil.propertyAsInt(
options(), RewriteTablePath.THREAD_POOL_SIZE,
RewriteTablePath.THREAD_POOL_SIZE_DEFAULT);
ExecutorService pool = ThreadPools.newFixedThreadPool("rewrite-table-path",
poolSize);
try {
Tasks.foreach(validSnapshots)
.noRetry()
.throwFailureWhenFinished()
.executeWith(pool)
.run(snapshot -> ...);
} finally {
pool.shutdown();
}
```
**In `RewriteTablePathProcedure`:** add an optional `options` parameter of
type `MapType(StringType, StringType)` and forward it to the action via
`action.options(optionsMap)`, following the same pattern as
`RewriteDataFilesProcedure`.
This would allow PySpark users to tune parallelism without any Java interop:
```python
spark.sql("""
CALL system.rewrite_table_path(
table => 'db.my_table',
source_prefix => 'hdfs://old-cluster/',
target_prefix => 'hdfs://new-cluster/',
options => map('thread-pool-size', '16')
)
""")
```
**Affected Files**
- `api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java`
-
`spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java`
-
`spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteTablePathProcedure.java`
- Same for `spark/v4.0` and `spark/v4.1`
- `docs/docs/spark-procedures.md`
**References**
- `RewriteTablePath.executeWith` — [1](#33-0)
- `rebuildMetadata` sequential loop — [2](#33-1)
- `RewriteTablePathProcedure` — no `options` parameter — [3](#33-2)
### Query engine
Spark
### Willingness to contribute
- [ ] I can contribute this improvement/feature independently
- [ ] I would be willing to contribute this improvement/feature with
guidance from the Iceberg community
- [x] I cannot contribute this improvement/feature at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]