This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6b2e21a93ef HIVE-27906: Iceberg: Implement Delete Orphan Files.
(#4897). (Ayush Saxena, reviewed by zhangbutao)
6b2e21a93ef is described below
commit 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Nov 28 11:47:18 2023 +0530
HIVE-27906: Iceberg: Implement Delete Orphan Files. (#4897). (Ayush Saxena,
reviewed by zhangbutao)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 31 +++
.../hive/actions/HiveIcebergDeleteOrphanFiles.java | 232 +++++++++++++++++++++
.../mr/hive/TestHiveIcebergExpireSnapshots.java | 55 +++++
.../hadoop/hive/ql/parse/AlterClauseParser.g | 2 +
.../apache/hadoop/hive/ql/parse/HiveLexerParent.g | 3 +
.../table/execute/AlterTableExecuteAnalyzer.java | 27 ++-
.../hive/ql/parse/AlterTableExecuteSpec.java | 29 ++-
7 files changed, 377 insertions(+), 2 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 10a00db534a..c729fcef430 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -158,6 +158,7 @@ import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
@@ -171,6 +172,7 @@ import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
@@ -849,12 +851,41 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
IcebergTableUtil.performMetadataDelete(icebergTable,
deleteMetadataSpec.getBranchName(),
deleteMetadataSpec.getSarg());
break;
+ case DELETE_ORPHAN_FILES:
+ int numDeleteThreads =
conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
+
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
+ AlterTableExecuteSpec.DeleteOrphanFilesDesc deleteOrphanFilesSpec =
+ (AlterTableExecuteSpec.DeleteOrphanFilesDesc)
executeSpec.getOperationParams();
+ deleteOrphanFiles(icebergTable,
deleteOrphanFilesSpec.getTimestampMillis(), numDeleteThreads);
+ break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported",
executeSpec.getOperationType().name()));
}
}
+ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int
numThreads) {
+ ExecutorService deleteExecutorService = null;
+ try {
+ if (numThreads > 0) {
+ LOG.info("Executing delete orphan files on iceberg table {} with {}
threads", icebergTable.name(), numThreads);
+ deleteExecutorService = getDeleteExecutorService(icebergTable.name(),
numThreads);
+ }
+
+ HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new
HiveIcebergDeleteOrphanFiles(conf, icebergTable);
+ deleteOrphanFiles.olderThan(timestampMillis);
+ if (deleteExecutorService != null) {
+ deleteOrphanFiles.executeDeleteWith(deleteExecutorService);
+ }
+ DeleteOrphanFiles.Result result = deleteOrphanFiles.execute();
+ LOG.debug("Cleaned files {} for {}", result.orphanFileLocations(),
icebergTable);
+ } finally {
+ if (deleteExecutorService != null) {
+ deleteExecutorService.shutdown();
+ }
+ }
+ }
+
private void expireSnapshot(Table icebergTable,
AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
int numThreads) {
ExecutorService deleteExecutorService = null;
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
new file mode 100644
index 00000000000..3c2e466208f
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.actions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
+
+public class HiveIcebergDeleteOrphanFiles implements DeleteOrphanFiles {
+
+ public static final String METADATA_FOLDER_NAME = "metadata";
+ public static final String DATA_FOLDER_NAME = "data";
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergDeleteOrphanFiles.class);
+ private String tableLocation;
+ private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
+ private Consumer<String> deleteFunc;
+ private ExecutorService deleteExecutorService =
MoreExecutors.newDirectExecutorService();
+
+ private final Configuration conf;
+ private final Table table;
+
+ public HiveIcebergDeleteOrphanFiles(Configuration conf, Table table) {
+ this.conf = conf;
+ this.table = table;
+ this.deleteFunc = file -> table.io().deleteFile(file);
+ this.tableLocation = table.location();
+ }
+
+ @Override
+ public HiveIcebergDeleteOrphanFiles location(String location) {
+ this.tableLocation = location;
+ return this;
+ }
+
+ @Override
+ public HiveIcebergDeleteOrphanFiles olderThan(long newOlderThanTimestamp) {
+ this.olderThanTimestamp = newOlderThanTimestamp;
+ return this;
+ }
+
+ // TODO: Implement later, if there is any use case.
+ @Override
+ public HiveIcebergDeleteOrphanFiles deleteWith(Consumer<String>
newDeleteFunc) {
+ this.deleteFunc = newDeleteFunc;
+ return this;
+ }
+
+ @Override
+ public HiveIcebergDeleteOrphanFiles executeDeleteWith(ExecutorService
executorService) {
+ this.deleteExecutorService = executorService;
+ return this;
+ }
+
+ @Override
+ public Result execute() {
+ LOG.info("Cleaning orphan files for {}", table.name());
+ HiveIcebergDeleteOrphanFilesResult result = new
HiveIcebergDeleteOrphanFilesResult();
+ result.addDeletedFiles(cleanContentFiles(olderThanTimestamp));
+ result.addDeletedFiles(cleanMetadata(olderThanTimestamp));
+
+ LOG.debug("Deleting {} files while cleaning orphan files for {}",
result.deletedFiles.size(), table.name());
+
Tasks.foreach(result.deletedFiles).executeWith(deleteExecutorService).retry(3)
+
.stopRetryOn(FileNotFoundException.class).suppressFailureWhenFinished().onFailure((file,
thrown) ->
+ LOG.warn("Delete failed for file: {}", file,
thrown)).run(deleteFunc::accept);
+ return result;
+ }
+
+ private Set<String> cleanContentFiles(long lastTime) {
+ Set<String> validFiles = Sets.union(getAllContentFilePath(),
getAllStatisticsFilePath(table));
+ LOG.debug("Valid content file for {} are {}", table.name(),
validFiles.size());
+ try {
+ Path dataPath = new Path(tableLocation, DATA_FOLDER_NAME);
+ return getFilesToBeDeleted(lastTime, validFiles, dataPath);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ public Set<String> getAllContentFilePath() {
+ Set<String> validFilesPath = Sets.newHashSet();
+ Table metadatTable = getMetadataTable();
+
+ TableScan tableScan = metadatTable.newScan();
+ CloseableIterable<FileScanTask> manifestFileScanTasks =
tableScan.planFiles();
+ CloseableIterable<StructLike> entries =
CloseableIterable.concat(entriesOfManifest(manifestFileScanTasks));
+
+ for (StructLike entry : entries) {
+ StructLike fileRecord = entry.get(4, StructLike.class);
+ String filePath = fileRecord.get(1, String.class);
+ validFilesPath.add(getUriPath(filePath));
+ }
+ return validFilesPath;
+ }
+
+ private Iterable<CloseableIterable<StructLike>> entriesOfManifest(
+ CloseableIterable<FileScanTask> fileScanTasks) {
+ return Iterables.transform(
+ fileScanTasks,
+ task -> {
+ assert task != null;
+ return ((DataTask) task).rows();
+ });
+ }
+
+ public static Set<String> getAllStatisticsFilePath(Table table) {
+ return
ReachableFileUtil.statisticsFilesLocations(table).stream().map(HiveIcebergDeleteOrphanFiles::getUriPath)
+ .collect(Collectors.toSet());
+ }
+
+ protected Set<String> cleanMetadata(long lastTime) {
+ LOG.info("{} start clean metadata files", table.name());
+ try {
+ Set<String> validFiles = getValidMetadataFiles(table);
+ LOG.debug("Valid metadata files for {} are {}", table.name(),
validFiles);
+ Path metadataLocation = new Path(tableLocation, METADATA_FOLDER_NAME);
+ return getFilesToBeDeleted(lastTime, validFiles, metadataLocation);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ private Set<String> getFilesToBeDeleted(long lastTime, Set<String>
validFiles, Path location)
+ throws IOException {
+ Set<String> filesToDelete = Sets.newHashSet();
+ FileSystem fs = location.getFileSystem(conf);
+ RemoteIterator<LocatedFileStatus> metadataLocations =
fs.listFiles(location, true);
+ while (metadataLocations.hasNext()) {
+ LocatedFileStatus metadataFile = metadataLocations.next();
+ if (metadataFile.getModificationTime() < lastTime &&
!validFiles.contains(
+ getUriPath(metadataFile.getPath().toString()))) {
+ filesToDelete.add(metadataFile.getPath().toString());
+ }
+ }
+ return filesToDelete;
+ }
+
+ private Table getMetadataTable() {
+ return
MetadataTableUtils.createMetadataTableInstance(((HasTableOperations)
table).operations(), table.name(),
+ table.name() + "#" + ALL_ENTRIES.name(), ALL_ENTRIES);
+ }
+
+ private static Set<String> getValidMetadataFiles(Table icebergTable) {
+ Set<String> validFiles = Sets.newHashSet();
+ Iterable<Snapshot> snapshots = icebergTable.snapshots();
+ for (Snapshot snapshot : snapshots) {
+ String manifestListLocation = snapshot.manifestListLocation();
+ validFiles.add(getUriPath(manifestListLocation));
+
+ List<ManifestFile> manifestFiles =
snapshot.allManifests(icebergTable.io());
+ for (ManifestFile manifestFile : manifestFiles) {
+ validFiles.add(getUriPath(manifestFile.path()));
+ }
+ }
+ Stream.of(
+ ReachableFileUtil.metadataFileLocations(icebergTable,
false).stream(),
+ ReachableFileUtil.statisticsFilesLocations(icebergTable).stream(),
+ Stream.of(ReachableFileUtil.versionHintLocation(icebergTable)))
+ .reduce(Stream::concat)
+ .orElse(Stream.empty())
+ .map(HiveIcebergDeleteOrphanFiles::getUriPath)
+ .forEach(validFiles::add);
+
+ return validFiles;
+ }
+
+ private static String getUriPath(String path) {
+ return URI.create(path).getPath();
+ }
+
+ static class HiveIcebergDeleteOrphanFilesResult implements Result {
+
+ private final Set<String> deletedFiles = Sets.newHashSet();
+
+ @Override
+ public Iterable<String> orphanFileLocations() {
+ return deletedFiles;
+ }
+
+ public void addDeletedFiles(Set<String> files) {
+ this.deletedFiles.addAll(files);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
index 0d65c6c7366..4a3b951bde4 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
@@ -22,10 +22,15 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.List;
import org.apache.commons.collections4.IterableUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -111,4 +116,54 @@ public class TestHiveIcebergExpireSnapshots extends
HiveIcebergStorageHandlerWit
table.refresh();
Assert.assertEquals(5, IterableUtils.size(table.snapshots()));
}
+
+ @Test
+ public void testDeleteOrphanFiles() throws IOException, InterruptedException
{
+ TableIdentifier identifier = TableIdentifier.of("default", "source");
+ Table table =
+ testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
5);
+ Assert.assertEquals(5, table.history().size());
+
+ List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
identifier.name());
+ List<Record> originalRecords =
HiveIcebergTestUtils.valueForRow(table.schema(), rows);
+ Path orphanDataFile = new Path(table.location(), "data/dataFile");
+ Path orphanMetadataFile = new Path(table.location(), "metadata/metafile");
+ FileSystem fs = orphanDataFile.getFileSystem(shell.getHiveConf());
+ fs.create(orphanDataFile).close();
+ fs.create(orphanMetadataFile).close();
+
+ int numDataFiles = RemoteIterators.toList(fs.listFiles(new
Path(table.location(), "data"), true)).size();
+ int numMetadataFiles = RemoteIterators.toList(fs.listFiles(new
Path(table.location(), "metadata"), true)).size();
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
DELETE ORPHAN-FILES");
+
+ Assert.assertEquals(numDataFiles,
+ RemoteIterators.toList(fs.listFiles(new Path(table.location(),
"data"), true)).size());
+
+ Assert.assertEquals(numMetadataFiles,
+ RemoteIterators.toList(fs.listFiles(new Path(table.location(),
"metadata"), true)).size());
+
+ Assert.assertTrue(fs.exists(orphanDataFile));
+ Assert.assertTrue(fs.exists(orphanDataFile));
+
+ long time = System.currentTimeMillis() + 1000;
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS000000");
+ String timeStamp = simpleDateFormat.format(new Date(time));
+ shell.executeStatement(
+ "ALTER TABLE " + identifier.name() + " EXECUTE DELETE ORPHAN-FILES
OLDER THAN ('" + timeStamp + "')");
+
+ Assert.assertEquals(numDataFiles - 1,
+ RemoteIterators.toList(fs.listFiles(new Path(table.location(),
"data"), true)).size());
+
+ Assert.assertEquals(numMetadataFiles - 1,
+ RemoteIterators.toList(fs.listFiles(new Path(table.location(),
"metadata"), true)).size());
+
+ Assert.assertFalse(fs.exists(orphanDataFile));
+ Assert.assertFalse(fs.exists(orphanDataFile));
+ table.refresh();
+
+ rows = shell.executeStatement("SELECT * FROM " + identifier.name());
+ List<Record> records = HiveIcebergTestUtils.valueForRow(table.schema(),
rows);
+ HiveIcebergTestUtils.validateData(originalRecords, records, 0);
+ }
}
diff --git
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index a184b41e0f4..3e6105957c0 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -487,6 +487,8 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp
$toTimestamp)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain)
+ | KW_EXECUTE KW_DELETE KW_ORPHAN_FILES (KW_OLDER KW_THAN LPAREN
(timestamp=StringLiteral) RPAREN)?
+ -> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?)
;
alterStatementSuffixDropBranch
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
index 883b9774ffb..a26d66d214d 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
@@ -399,6 +399,9 @@ KW_RETENTION: 'RETENTION';
KW_TAG: 'TAG';
KW_FAST_FORWARD: 'FAST-FORWARD';
KW_CHERRY_PICK: 'CHERRY-PICK';
+KW_ORPHAN_FILES: 'ORPHAN-FILES';
+KW_OLDER: 'OLDER';
+KW_THAN: 'THAN';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that
describe function _FUNC_ will work.
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index cdd6f035d4d..79448df3b2a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.CherryPickSpec;
+import
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.DeleteOrphanFilesDesc;
import
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec;
@@ -45,9 +46,11 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.CHERRY_PICK;
+import static
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.DELETE_ORPHAN_FILES;
import static
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;
import static
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.FAST_FORWARD;
import static
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK;
@@ -83,7 +86,6 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
break;
case HiveParser.KW_EXPIRE_SNAPSHOTS:
desc = getExpireSnapshotDesc(tableName, partitionSpec,
command.getChildren());
-
break;
case HiveParser.KW_SET_CURRENT_SNAPSHOT:
desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
@@ -94,6 +96,9 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
case HiveParser.KW_CHERRY_PICK:
desc = getCherryPickDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
break;
+ case HiveParser.KW_ORPHAN_FILES:
+ desc = getDeleteOrphanFilesDesc(tableName, partitionSpec,
command.getChildren());
+ break;
}
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
desc)));
@@ -178,4 +183,24 @@ public class AlterTableExecuteAnalyzer extends
AbstractAlterTableAnalyzer {
}
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}
+
+ private static AlterTableExecuteDesc getDeleteOrphanFilesDesc(TableName
tableName, Map<String, String> partitionSpec,
+ List<Node> children) throws SemanticException {
+
+ long time = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+ if (children.size() == 2) {
+ time = getTimeStampMillis((ASTNode) children.get(1));
+ }
+ AlterTableExecuteSpec spec = new
AlterTableExecuteSpec(DELETE_ORPHAN_FILES, new DeleteOrphanFilesDesc(time));
+ return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
+ }
+
+ private static long getTimeStampMillis(ASTNode childNode) {
+ String childNodeText = PlanUtils.stripQuotes(childNode.getText());
+ ZoneId timeZone = SessionState.get() == null ?
+ new HiveConf().getLocalTimeZone() :
+ SessionState.get().getConf().getLocalTimeZone();
+ TimestampTZ time =
TimestampTZUtil.parse(PlanUtils.stripQuotes(childNodeText), timeZone);
+ return time.toEpochMilli();
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index 2b7ca285e1c..54c8df3573c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import java.util.Arrays;
@@ -40,7 +42,8 @@ public class AlterTableExecuteSpec<T> {
SET_CURRENT_SNAPSHOT,
FAST_FORWARD,
CHERRY_PICK,
- DELETE_METADATA;
+ DELETE_METADATA,
+ DELETE_ORPHAN_FILES;
}
private final ExecuteOperationType operationType;
@@ -270,4 +273,28 @@ public class AlterTableExecuteSpec<T> {
return sarg;
}
}
+
+ /**
+ * Value object class, that stores the delete orphan files operation
specific parameters.
+ * <ul>
+ * <li>timestampMillis: the time before which files should be considered
to be deleted</li>
+ * </ul>
+ */
+ public static class DeleteOrphanFilesDesc {
+ private final long timestampMillis;
+
+ public DeleteOrphanFilesDesc(long timestampMillis) {
+ Preconditions.checkArgument(timestampMillis >= 0, "TimeStamp Millis
shouldn't be negative");
+ this.timestampMillis = timestampMillis;
+ }
+
+ public long getTimestampMillis() {
+ return timestampMillis;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("timestampMillis",
timestampMillis).toString();
+ }
+ }
}