aokolnychyi commented on code in PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#discussion_r902958198
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
package org.apache.iceberg.spark.actions;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
/**
- * An action that removes orphan metadata, data and delete files by listing a
given location and comparing
- * the actual files in that location with content and metadata files
referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link
FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link
Table#location()} and
- * removes unreachable files that are older than 3 days using {@link
Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to
{@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up
only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link
#compareToFileList(Dataset)} argument. This
- * skips the directory listing - any files in the dataset provided which are
not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link
#olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention
interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link
DeleteOrphanFilesSparkAction} instead.
*/
-public class BaseDeleteOrphanFilesSparkAction
- extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result>
implements DeleteOrphanFiles {
-
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
- private final SerializableConfiguration hadoopConf;
- private final int partitionDiscoveryParallelism;
- private final Table table;
- private final Consumer<String> defaultDelete = new Consumer<String>() {
- @Override
- public void accept(String file) {
- table.io().deleteFile(file);
- }
- };
-
- private String location = null;
- private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
- private Dataset<Row> compareToFileList;
- private Consumer<String> deleteFunc = defaultDelete;
- private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends
DeleteOrphanFilesSparkAction {
Review Comment:
I initially simply renamed the old actions. It was a much smaller change but
could break folks who did explicit casts in the past as these base action
classes were kind of public. That's why I extended the renamed classes for
compatibility with a deprecation warning.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
package org.apache.iceberg.spark.actions;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
/**
- * An action that removes orphan metadata, data and delete files by listing a
given location and comparing
- * the actual files in that location with content and metadata files
referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link
FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link
Table#location()} and
- * removes unreachable files that are older than 3 days using {@link
Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to
{@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up
only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link
#compareToFileList(Dataset)} argument. This
- * skips the directory listing - any files in the dataset provided which are
not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link
#olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention
interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ * use {@link SparkActions} and {@link
DeleteOrphanFilesSparkAction} instead.
*/
-public class BaseDeleteOrphanFilesSparkAction
- extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result>
implements DeleteOrphanFiles {
-
- private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
- private final SerializableConfiguration hadoopConf;
- private final int partitionDiscoveryParallelism;
- private final Table table;
- private final Consumer<String> defaultDelete = new Consumer<String>() {
- @Override
- public void accept(String file) {
- table.io().deleteFile(file);
- }
- };
-
- private String location = null;
- private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
- private Dataset<Row> compareToFileList;
- private Consumer<String> deleteFunc = defaultDelete;
- private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends
DeleteOrphanFilesSparkAction {
Review Comment:
Instead of exposing actual classes, we could add Spark-specific interfaces
and expose them. However, I don't think it is a good idea. Methods we would add
to those interfaces usually reflect internal implementation details that are
not generic. In addition, we probably don’t want to complicate the actions API
by adding another layer of interfaces. Exposing classes seems to be the most
straightforward option but I'd prefer to do a rename in that case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]