mxm commented on code in PR #15459:
URL: https://github.com/apache/iceberg/pull/15459#discussion_r2884007688
##########
docs/docs/flink-maintenance.md:
##########
@@ -167,6 +171,36 @@ TableMaintenance.forTable(env, tableLoader, lockFactory)
env.execute("Table Maintenance Job");
```
+Use Coordinator Lock
Review Comment:
```suggestion
#### Managing table locking via Flink
```
##########
docs/docs/flink-maintenance.md:
##########
@@ -124,6 +124,10 @@ TriggerLockFactory lockFactory = new ZkLockFactory(
);
```
+#### Coordinator Lock
+
+Maintain the lock within Flink itself, and use the Coordinator to communicate
lock acquisition and release.
+
Review Comment:
Do we need to document implementation details here? How about:
```suggestion
#### Flink-maintained lock
Maintain the lock within Flink itself. This does not require configuring
external systems. One prerequisite is that there are no parallel table
maintenance jobs for a given table.
```
##########
docs/docs/flink-maintenance.md:
##########
@@ -381,6 +415,12 @@ These keys are used in SQL (SET or table WITH options) and
are applicable when w
| `flink-maintenance.lock.zookeeper.max-sleep-ms` | Maximum sleep
time (ms) between retries. Caps the exponential backoff delay. | `10000` |
| `flink-maintenance.lock.zookeeper.retry-policy` | Retry policy name
for ZooKeeper client. Supported values include: ONE_TIME, N_TIME,
BOUNDED_EXPONENTIAL_BACKOFF, UNTIL_ELAPSED, EXPONENTIAL_BACKOFF. |
`EXPONENTIAL_BACKOFF` |
+- COORDINATOR LOCK
+
+| Key | Description | Default |
+|-----|----------------------|---------|
+| `flink-maintenance.lock.type` | Set to `` or not set | |
Review Comment:
Would it make sense to have this default to `flink`? (which will be using
the coordinator)
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java:
##########
@@ -252,7 +252,10 @@ public void close() throws Exception {
@VisibleForTesting
void handleLockRelease(LockReleaseEvent event) {
- Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't
release lock");
+ if (lockTime == null) {
+ LOG.warn("Lock time is null, Can't release lock");
+ return;
+ }
Review Comment:
I've left a comment on that PR.
##########
docs/docs/flink-maintenance.md:
##########
@@ -167,6 +171,36 @@ TableMaintenance.forTable(env, tableLoader, lockFactory)
env.execute("Table Maintenance Job");
```
+Use Coordinator Lock
+
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+TableLoader tableLoader = TableLoader.fromCatalog(
+ CatalogLoader.hive("my_catalog", configuration, properties),
+ TableIdentifier.of("database", "table")
+);
+
+TableMaintenance.forTable(env, tableLoader)
+ .uidSuffix("my-maintenance-job")
+ .rateLimit(Duration.ofMinutes(10))
+ .lockCheckDelay(Duration.ofSeconds(10))
+ .add(ExpireSnapshots.builder()
+ .scheduleOnCommitCount(10)
+ .maxSnapshotAge(Duration.ofMinutes(10))
+ .retainLast(5)
+ .deleteBatchSize(5)
+ .parallelism(8))
+ .add(RewriteDataFiles.builder()
+ .scheduleOnDataFileCount(10)
+ .targetFileSizeBytes(128 * 1024 * 1024)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(10))
+ .append();
+
+env.execute("Table Maintenance Job");
Review Comment:
Everything except line 184 is identical (no lock parameter). Maybe
consolidate the two sections and just explain that the builder can either be
```
TableMaintenance.forTable(env, tableLoader, lockFactory)
```
or
```
TableMaintenance.forTable(env, tableLoader)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]