aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r904209393
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -89,4 +120,8 @@ interface Result {
*/
Iterable<String> orphanFileLocations();
}
+
+ enum PrefixMismatchMode {
Review Comment:
I feel like some Javadoc would be helpful. Could you add a sentence or two?
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,36 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ *
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement prefixMismatchMode");
+ }
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ *
+ * @param equalSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalSchemes(List<String> equalSchemes) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement equivalentSchemes");
+ }
+
+ /**
+ * Pass a list of authorities to be considered equivalent when finding
orphan files
+ *
+ * @param equalAuthorities list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalAuthorities(List<String> equalAuthorities) {
Review Comment:
I feel it would be safer and easier to accept maps for equal schemes and
authorities. If a table stores data in multiple locations, there will be no way
we can provide equal authorities per location with a common list. I'd probably
opt for using maps in the API.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,36 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ *
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement prefixMismatchMode");
+ }
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ *
+ * @param equalSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalSchemes(List<String> equalSchemes) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement equivalentSchemes");
Review Comment:
nit: `equivalentSchemes` -> `equalSchemes` in the error message
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/FileDescriptor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+public class FileDescriptor {
Review Comment:
What about making this a static class in the action?
Also, can we make it just cover one set of values? We can cast actual and
metadata DFs separately before joining and then use `df.as()` to deconflict
attributes in the join condition.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +240,13 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ?
buildActualFileDF() : filteredCompareToFileList();
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains =
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti")
- .as(Encoders.STRING())
- .collectAsList();
+ List<String> orphanFiles =
+ findOrphanFileDF(actualFileDF,
Review Comment:
nit: `findOrphanFileDF` -> `findOrphanFiles` as it no longer returns
`DataFrame`.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,36 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ *
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement prefixMismatchMode");
+ }
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ *
+ * @param equalSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalSchemes(List<String> equalSchemes) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement equivalentSchemes");
+ }
+
+ /**
+ * Pass a list of authorities to be considered equivalent when finding
orphan files
+ *
+ * @param equalAuthorities list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalAuthorities(List<String> equalAuthorities) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement equivalentAuthorities");
Review Comment:
nit: `equivalentAuthorities` -> `equalAuthorities` in the error message
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,36 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
Review Comment:
Shall we add a few sentences about possible modes and what's the default
behavior if not called?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -126,6 +133,8 @@ public BaseDeleteOrphanFilesSparkAction(SparkSession spark,
Table table) {
this.partitionDiscoveryParallelism =
spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
this.table = table;
this.location = table.location();
+ this.setAccumulator = new SetAccumulator<>();
Review Comment:
What about using `Pair` in the accumulator to capture conflicting pairs
rather that a set of strings? I feel that should help us come up with a more
descriptive message.
Also, we should init the accumulator somewhere in `execute`, when we
absolutely sure we will need it. I'd make it a local var.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+ private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+ @Override
+ public boolean isZero() {
+ return set.isEmpty();
+ }
+
+ @Override
+ public AccumulatorV2<T, Set<T>> copy() {
+ SetAccumulator<T> objectSetAccumulator = new SetAccumulator<>();
+ objectSetAccumulator.set.addAll(set);
+ return objectSetAccumulator;
+ }
+
+ @Override
+ public void reset() {
+ set.clear();
+ }
+
+ @Override
+ public void add(T v) {
+ set.add(v);
+ }
+
+ @Override
+ public void merge(AccumulatorV2<T, Set<T>> other) {
+ set.addAll(other.value());
+ }
+
+ @Override
+ public Set<T> value() {
+ return Sets.newHashSet(set);
Review Comment:
Why create a copy to return the value? Is it to provide a read-only copy for
safety?
I wouldn't worry too much about this or use `Collections.unmodifiableSet`.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,36 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Pass a mode for handling the files that cannot be determined if they are
orphan
+ *
+ * @param mode mode for handling files that cannot be determined if they are
orphan
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does
not implement prefixMismatchMode");
+ }
+
+ /**
+ * Pass a list of schemes to be considered equivalent when finding orphan
files
+ *
+ * @param equalSchemes list of equivalent schemes
+ * @return this for method chaining
+ */
+ default DeleteOrphanFiles equalSchemes(List<String> equalSchemes) {
Review Comment:
We should note if a user provides equal schemes and authorities, we consider
those mappings complete. In other words, if there is no mapping for two
authorities, they are considered different.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+ private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+ @Override
+ public boolean isZero() {
+ return set.isEmpty();
+ }
+
+ @Override
+ public AccumulatorV2<T, Set<T>> copy() {
+ SetAccumulator<T> objectSetAccumulator = new SetAccumulator<>();
Review Comment:
nit: `objectSetAccumulator` -> `newAccumulator`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]