Guosmilesmile commented on code in PR #15566: URL: https://github.com/apache/iceberg/pull/15566#discussion_r2929086567
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.util.List; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode; +import org.apache.iceberg.flink.FlinkConfParser; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class DeleteOrphanFilesConfig { + public static final String PREFIX = FlinkMaintenanceConfig.PREFIX + "delete-orphan-files."; + + private static final Splitter COMMA_SPLITTER = Splitter.on(","); + private static final Splitter EQUALS_SPLITTER = Splitter.on("=").limit(2); + + public static final String SCHEDULE_ON_COMMIT_COUNT = PREFIX + "schedule.commit-count"; + public static final ConfigOption<Integer> SCHEDULE_ON_COMMIT_COUNT_OPTION = + ConfigOptions.key(SCHEDULE_ON_COMMIT_COUNT) + .intType() + .defaultValue(10) + .withDescription( + "The number of commits after which to trigger a new delete orphan files operation."); + + public static final String SCHEDULE_ON_DATA_FILE_COUNT = PREFIX + "schedule.data-file-count"; + public static final ConfigOption<Integer> SCHEDULE_ON_DATA_FILE_COUNT_OPTION = + ConfigOptions.key(SCHEDULE_ON_DATA_FILE_COUNT) + .intType() + .defaultValue(1000) + .withDescription( + "The number of data files that should trigger a new delete orphan files operation."); + + public static final String SCHEDULE_ON_DATA_FILE_SIZE = PREFIX + "schedule.data-file-size"; + public static final ConfigOption<Long> SCHEDULE_ON_DATA_FILE_SIZE_OPTION = + ConfigOptions.key(SCHEDULE_ON_DATA_FILE_SIZE) + .longType() + .defaultValue(100L * 1024 * 1024 * 1024) // Default is 100 GB + .withDescription( + "The total size of data files that should trigger a new delete orphan files operation."); + + public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX + "schedule.interval-second"; + public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION = + ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND) + .longType() + .defaultValue(60 * 60L) // Default is 1 hour + .withDescription( + "The time interval (in seconds) between two consecutive delete orphan files operations."); + + public static final String MIN_AGE_SECONDS = PREFIX + "min-age-seconds"; + public static final ConfigOption<Long> MIN_AGE_SECONDS_OPTION = + ConfigOptions.key(MIN_AGE_SECONDS) + .longType() + .defaultValue(3L * 24 * 60 * 60) // Default is 3 days + .withDescription( + "The minimum age (in seconds) of files to be considered for deletion. " + + "Files newer than this will not be removed."); + + public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size"; + public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION = + ConfigOptions.key(DELETE_BATCH_SIZE) + .intType() + .defaultValue(1000) + .withDescription("The batch size used for deleting orphan files."); + + public static final String LOCATION = PREFIX + "location"; + public static final ConfigOption<String> LOCATION_OPTION = + ConfigOptions.key(LOCATION) + .stringType() + .noDefaultValue() + .withDescription( + "The location to start recursive listing of candidate files for removal. " + + "By default, the table location is used."); + + public static final String USE_PREFIX_LISTING = PREFIX + "use-prefix-listing"; + public static final ConfigOption<Boolean> USE_PREFIX_LISTING_OPTION = + ConfigOptions.key(USE_PREFIX_LISTING) + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to use prefix listing when listing files from the file system."); + + public static final String PLANNING_WORKER_POOL_SIZE = PREFIX + "planning-worker-pool-size"; + public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION = + ConfigOptions.key(PLANNING_WORKER_POOL_SIZE) + .intType() + .noDefaultValue() + .withDescription( + "The worker pool size used for planning the scan of the ALL_FILES table. " + + "If not set, the shared worker pool is used."); + + public static final String EQUAL_SCHEMES = PREFIX + "equal-schemes"; + public static final ConfigOption<String> EQUAL_SCHEMES_OPTION = + ConfigOptions.key(EQUAL_SCHEMES) + .stringType() + .noDefaultValue() Review Comment: The save above ########## docs/docs/flink-maintenance.md: ########## @@ -361,9 +409,64 @@ CREATE TABLE db.tbl ( ); ``` +### IcebergSink Maintenance Configuration (SQL) + +These keys are used in SQL (SET or table WITH options) or via `IcebergSink.Builder.set()` / `setAll()`. + +#### Enable Flags + +| Key | Description | Default | +|-----|-------------|---------| +| `flink-maintenance.rewrite.enabled` | Enable compaction (rewrite data files) | `false` | +| `flink-maintenance.expire-snapshots.enabled` | Enable expire snapshots | `false` | +| `flink-maintenance.delete-orphan-files.enabled` | Enable delete orphan files | `false` | + +#### Rewrite Data Files Configuration + +| Key | Description | Default | +|-----|-------------|---------| +| `flink-maintenance.rewrite.schedule.commit-count` | Trigger after N commits | `10` | +| `flink-maintenance.rewrite.schedule.data-file-count` | Trigger after N data files | `1000` | +| `flink-maintenance.rewrite.schedule.data-file-size` | Trigger after total data file size (bytes) | `107374182400` (100GB) | +| `flink-maintenance.rewrite.schedule.interval-second` | Trigger after time interval (seconds) | `600` | +| `flink-maintenance.rewrite.max-bytes` | Maximum bytes to rewrite per execution | `Long.MAX_VALUE` | +| `flink-maintenance.rewrite.partial-progress.enabled` | Enable partial progress commits | `false` | +| `flink-maintenance.rewrite.partial-progress.max-commits` | Maximum commits for partial progress | `10` | + +#### Expire Snapshots Configuration + +| Key | Description | Default | +|-----|-------------|---------| +| `flink-maintenance.expire-snapshots.schedule.commit-count` | Trigger after N commits | `10` | +| `flink-maintenance.expire-snapshots.schedule.data-file-count` | Trigger after N data files | `1000` | +| `flink-maintenance.expire-snapshots.schedule.data-file-size` | Trigger after total data file size (bytes) | `107374182400` (100GB) | +| `flink-maintenance.expire-snapshots.schedule.interval-second` | Trigger after time interval (seconds) | `600` | +| `flink-maintenance.expire-snapshots.max-snapshot-age-seconds` | Maximum age of snapshots to retain (seconds) | Not set | +| `flink-maintenance.expire-snapshots.retain-last` | Minimum number of snapshots to retain | Not set | +| `flink-maintenance.expire-snapshots.delete-batch-size` | Batch size for deleting expired files | `1000` | +| `flink-maintenance.expire-snapshots.clean-expired-metadata` | Remove expired metadata (partition specs, schemas) | Not set | +| `flink-maintenance.expire-snapshots.planning-worker-pool-size` | Worker pool size for planning | Shared pool | + +#### Delete Orphan Files Configuration + +| Key | Description | Default | +|-----|-------------|---------| +| `flink-maintenance.delete-orphan-files.schedule.commit-count` | Trigger after N commits | `10` | +| `flink-maintenance.delete-orphan-files.schedule.data-file-count` | Trigger after N data files | `1000` | +| `flink-maintenance.delete-orphan-files.schedule.data-file-size` | Trigger after total data file size (bytes) | `107374182400` (100GB) | +| `flink-maintenance.delete-orphan-files.schedule.interval-second` | Trigger after time interval (seconds) | `600` | +| `flink-maintenance.delete-orphan-files.min-age-seconds` | Minimum age of files to consider for deletion (seconds) | `259200` (3 days) | +| `flink-maintenance.delete-orphan-files.delete-batch-size` | Batch size for deleting orphan files | `1000` | +| `flink-maintenance.delete-orphan-files.location` | Location to start recursive listing | Table location | +| `flink-maintenance.delete-orphan-files.use-prefix-listing` | Use prefix listing for file discovery | `false` | +| `flink-maintenance.delete-orphan-files.planning-worker-pool-size` | Worker pool size for planning | Shared pool | +| `flink-maintenance.delete-orphan-files.equal-schemes` | Equivalent schemes (format: `s3n=s3,s3a=s3`) | Not set | Review Comment: We have default value ``` private Map<String, String> equalSchemes = Maps.newHashMap( ImmutableMap.of( "s3n", "s3", "s3a", "s3")); ``` ########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java: ########## @@ -189,6 +189,41 @@ public Builder deleteBatchSize(int newDeleteBatchSize) { return this; } + public Builder config(DeleteOrphanFilesConfig deleteOrphanFilesConfig) { + this.scheduleOnCommitCount(deleteOrphanFilesConfig.scheduleOnCommitCount()) + .scheduleOnDataFileCount(deleteOrphanFilesConfig.scheduleOnDataFileCount()) + .scheduleOnDataFileSize(deleteOrphanFilesConfig.scheduleOnDataFileSize()) + .scheduleOnInterval( + Duration.ofSeconds(deleteOrphanFilesConfig.scheduleOnIntervalSecond())); + + this.minAge(Duration.ofSeconds(deleteOrphanFilesConfig.minAgeSeconds())) + .deleteBatchSize(deleteOrphanFilesConfig.deleteBatchSize()) + .usePrefixListing(deleteOrphanFilesConfig.usePrefixListing()) + .prefixMismatchMode(deleteOrphanFilesConfig.prefixMismatchMode()); + + String loc = deleteOrphanFilesConfig.location(); + if (loc != null) { + this.location(loc); + } + + Integer poolSize = deleteOrphanFilesConfig.planningWorkerPoolSize(); + if (poolSize != null) { + this.planningWorkerPoolSize(poolSize); + } + + Map<String, String> schemes = deleteOrphanFilesConfig.equalSchemes(); + if (schemes != null) { + this.equalSchemes(schemes); + } Review Comment: We have default value in https://github.com/apache/iceberg/blob/bd54026d732a1eac49d0a6852036d1cd113ea65d/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java#L83-L87. Does this need to be changed to an append instead of an overwrite here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
