aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r866297319
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+
Review Comment:
nit: extra empty line
##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result>
implements DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
private String location = null;
+ private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;
Review Comment:
I think the default mode should be error to bring the attention to the
problem.
##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -128,6 +128,21 @@ public BaseDeleteOrphanFilesSparkAction
executeDeleteWith(ExecutorService execut
return this;
}
+ @Override
+ public DeleteOrphanFiles prefixMismatchMode(String mode) {
Review Comment:
If you add default implementations, you may avoid touching all Spark
versions.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles prefixMismatchMode(String mode);
Review Comment:
I think you can move the enum from the Spark module here and make it nested
within this action interface (e.g. right above the result interface). Then we
can refer to it in this method instead of using a string.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles prefixMismatchMode(String mode);
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ * @param equivalentSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
Review Comment:
All newly added methods should have default implementations throwing an
exception.
```
throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement XXX");
```
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles prefixMismatchMode(String mode);
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ * @param equivalentSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+ /**
+ * Pass a list of authorities to be considered equivalent when finding
orphan files
Review Comment:
Question: what if I have the following use case where `bucket1` and
`bucket2` are indeed different?
```
s3://bucket1/path/to/file.parquet - actual
s3://bucket2/path/to/file.parquet - referenced by metadata
```
The first file should be deleted. How can we support this case with this
API? The only way I can think of right now is to add another method of unequal
authorities. That seems to complicate the action quite a bit.
Any better ideas?
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles prefixMismatchMode(String mode);
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ * @param equivalentSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+ /**
+ * Pass a list of authorities to be considered equivalent when finding
orphan files
+ * @param equivalentAuthorities list of equivalent schemes
+ * @return this for method chaining
+ */
+ DeleteOrphanFiles equivalentAuthorities(List<String> equivalentAuthorities);
Review Comment:
Question: will it be better to use a map? We recently added a way to specify
a list of actual files so it may contain entries for multiple
schemes/authorities.
##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result>
implements DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
private String location = null;
+ private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;
+ private List<String> equivalentSchemes;
Review Comment:
We also need to define some default values like s3a, s3, s3n
##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -273,4 +310,102 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
return files.iterator();
};
}
+
+ private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+ final StructType schema = new StructType(new StructField[]{
Review Comment:
Seems like this can be a static constant?
##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,
Review Comment:
nit: annotate with `@VisibleForTesting`?
##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,
Review Comment:
We usually try to avoid using `get` as it has almost to meaning. What about
`findOrphanFiles` and returning a list of strings?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]