This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 33d7294 Core: Add ReachableFileUtil (#2564)
33d7294 is described below
commit 33d72945aeb3c0d3e17a341079e72a26fbd03ef8
Author: Karuppayya <[email protected]>
AuthorDate: Mon May 17 15:53:39 2021 -0700
Core: Add ReachableFileUtil (#2564)
---
.../java/org/apache/iceberg/ReachableFileUtil.java | 113 +++++++++++++++++
.../org/apache/iceberg/actions/BaseAction.java | 3 +-
.../iceberg/hadoop/HadoopTableOperations.java | 2 +-
.../main/java/org/apache/iceberg/hadoop/Util.java | 3 +
.../apache/iceberg/util/TestReachableFileUtil.java | 135 +++++++++++++++++++++
.../actions/BaseRemoveOrphanFilesSparkAction.java | 6 +-
.../iceberg/spark/actions/BaseSparkAction.java | 49 ++------
7 files changed, 263 insertions(+), 48 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
new file mode 100644
index 0000000..0197b4a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReachableFileUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReachableFileUtil.class);
+
+ private ReachableFileUtil() {
+ }
+
+ /**
+ * Returns the location of the version hint file
+ *
+ * @param table table for which version hint file's path needs to be
retrieved
+ * @return the location of the version hint file
+ */
+ public static String versionHintLocation(Table table) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ return ops.metadataFileLocation(Util.VERSION_HINT_FILENAME);
+ }
+
+ /**
+ * Returns locations of JSON metadata files in a table.
+ *
+ * @param table Table to get JSON metadata files from
+ * @param recursive When true, recursively retrieves all the reachable JSON
metadata files.
+ * When false, gets the all the JSON metadata files only
from the current metadata.
+ * @return locations of JSON metadata files
+ */
+ public static Set<String> metadataFileLocations(Table table, boolean
recursive) {
+ Set<String> metadataFileLocations = Sets.newHashSet();
+ TableOperations ops = ((HasTableOperations) table).operations();
+ TableMetadata tableMetadata = ops.current();
+ metadataFileLocations.add(tableMetadata.metadataFileLocation());
+ metadataFileLocations(tableMetadata, metadataFileLocations, ops.io(),
recursive);
+ return metadataFileLocations;
+ }
+
+ private static void metadataFileLocations(TableMetadata metadata,
Set<String> metadataFileLocations,
+ FileIO io, boolean recursive) {
+ List<MetadataLogEntry> metadataLogEntries = metadata.previousFiles();
+ if (metadataLogEntries.size() > 0) {
+ for (MetadataLogEntry metadataLogEntry : metadataLogEntries) {
+ metadataFileLocations.add(metadataLogEntry.file());
+ }
+ if (recursive) {
+ TableMetadata previousMetadata =
findFirstExistentPreviousMetadata(metadataLogEntries, io);
+ if (previousMetadata != null) {
+ metadataFileLocations(previousMetadata, metadataFileLocations, io,
recursive);
+ }
+ }
+ }
+ }
+
+ private static TableMetadata
findFirstExistentPreviousMetadata(List<MetadataLogEntry> metadataLogEntries,
FileIO io) {
+ TableMetadata metadata = null;
+ for (MetadataLogEntry metadataLogEntry : metadataLogEntries) {
+ try {
+ metadata = TableMetadataParser.read(io, metadataLogEntry.file());
+ break;
+ } catch (Exception e) {
+ LOG.error("Failed to load {}", metadataLogEntry, e);
+ }
+ }
+ return metadata;
+ }
+
+ /**
+ * Returns locations of manifest lists in a table.
+ *
+ * @param table table for which manifestList needs to be fetched
+ * @return the location of manifest Lists
+ */
+ public static List<String> manifestListLocations(Table table) {
+ Iterable<Snapshot> snapshots = table.snapshots();
+ List<String> manifestListLocations = Lists.newArrayList();
+ for (Snapshot snapshot : snapshots) {
+ String manifestListLocation = snapshot.manifestListLocation();
+ if (manifestListLocation != null) {
+ manifestListLocations.add(manifestListLocation);
+ }
+ }
+ return manifestListLocations;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
index 4479bcc..387791f 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
abstract class BaseAction<ThisT, R> implements Action<ThisT, R> {
@@ -76,7 +77,7 @@ abstract class BaseAction<ThisT, R> implements Action<ThisT,
R> {
*/
protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
List<String> otherMetadataFiles = Lists.newArrayList();
- otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
+
otherMetadataFiles.add(ops.metadataFileLocation(Util.VERSION_HINT_FILENAME));
TableMetadata metadata = ops.current();
otherMetadataFiles.add(metadata.metadataFileLocation());
diff --git
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index dcbd57f..496f95f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -285,7 +285,7 @@ public class HadoopTableOperations implements
TableOperations {
@VisibleForTesting
Path versionHintFile() {
- return metadataPath("version-hint.text");
+ return metadataPath(Util.VERSION_HINT_FILENAME);
}
private void writeVersionHint(int versionToWrite) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
index bdb334b..7fc55d2 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
@@ -38,6 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Util {
+
+ public static final String VERSION_HINT_FILENAME = "version-hint.text";
+
private static final Logger LOG = LoggerFactory.getLogger(Util.class);
private Util() {
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java
new file mode 100644
index 0000000..257d25e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.File;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestReachableFileUtil {
+ private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get())
+ );
+
+ private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
+ private static final DataFile FILE_A = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ private static final DataFile FILE_B = DataFiles.builder(SPEC)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private Table table;
+
+ @Before
+ public void setupTableLocation() throws Exception {
+ File tableDir = temp.newFolder();
+ String tableLocation = tableDir.toURI().toString();
+ this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation);
+ }
+
+ @Test
+ public void testManifestListLocations() {
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ List<String> manifestListPaths =
ReachableFileUtil.manifestListLocations(table);
+ Assert.assertEquals(manifestListPaths.size(), 2);
+ }
+
+ @Test
+ public void testMetadataFileLocations() {
+ table.updateProperties()
+ .set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "1")
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ Set<String> metadataFileLocations =
ReachableFileUtil.metadataFileLocations(table, true);
+ Assert.assertEquals(metadataFileLocations.size(), 4);
+
+ metadataFileLocations = ReachableFileUtil.metadataFileLocations(table,
false);
+ Assert.assertEquals(metadataFileLocations.size(), 2);
+ }
+
+ @Test
+ public void testMetadataFileLocationsWithMissingFiles() {
+ table.updateProperties()
+ .set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "1")
+ .commit();
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ TableOperations operations = ((HasTableOperations) table).operations();
+ String location = operations.current().metadataFileLocation();
+ table.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ // delete v3.metadata.json making v2.metadata.json and v1.metadata.json
inaccessible
+ table.io().deleteFile(location);
+
+ Set<String> metadataFileLocations =
ReachableFileUtil.metadataFileLocations(table, true);
+ Assert.assertEquals(metadataFileLocations.size(), 2);
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
index 8454e0d..844dfd0 100644
---
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
+++
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
@@ -30,9 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.BaseRemoveOrphanFilesActionResult;
import org.apache.iceberg.actions.RemoveOrphanFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -91,7 +89,6 @@ public class BaseRemoveOrphanFilesSparkAction
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
- private final TableOperations ops;
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
@@ -108,7 +105,6 @@ public class BaseRemoveOrphanFilesSparkAction
this.hadoopConf = new
SerializableConfiguration(spark.sessionState().newHadoopConf());
this.partitionDiscoveryParallelism =
spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
this.table = table;
- this.ops = ((HasTableOperations) table).operations();
this.location = table.location();
ValidationException.check(
@@ -156,7 +152,7 @@ public class BaseRemoveOrphanFilesSparkAction
private RemoveOrphanFiles.Result doExecute() {
Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
- Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table, ops);
+ Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index e760a6d..fe7d980 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -27,11 +27,10 @@ import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.actions.ManifestFileBean;
import org.apache.iceberg.common.DynMethods;
@@ -109,40 +108,6 @@ abstract class BaseSparkAction<ThisT, R> implements
Action<ThisT, R> {
return new JobGroupInfo(groupId + "-" + JOB_COUNTER.incrementAndGet(),
desc, false);
}
- /**
- * Returns all the path locations of all Manifest Lists for a given list of
snapshots
- * @param snapshots snapshots
- * @return the paths of the Manifest Lists
- */
- private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
- List<String> manifestLists = Lists.newArrayList();
- for (Snapshot snapshot : snapshots) {
- String manifestListLocation = snapshot.manifestListLocation();
- if (manifestListLocation != null) {
- manifestLists.add(manifestListLocation);
- }
- }
- return manifestLists;
- }
-
- /**
- * Returns all Metadata file paths which may not be in the current metadata.
Specifically
- * this includes "version-hint" files as well as entries in
metadata.previousFiles.
- * @param ops TableOperations for the table we will be getting paths from
- * @return a list of paths to metadata files
- */
- private List<String> getOtherMetadataFilePaths(TableOperations ops) {
- List<String> otherMetadataFiles = Lists.newArrayList();
- otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
-
- TableMetadata metadata = ops.current();
- otherMetadataFiles.add(metadata.metadataFileLocation());
- for (TableMetadata.MetadataLogEntry previousMetadataFile :
metadata.previousFiles()) {
- otherMetadataFiles.add(previousMetadataFile.file());
- }
- return otherMetadataFiles;
- }
-
protected Table newStaticTable(TableMetadata metadata, FileIO io) {
String metadataFileLocation = metadata.metadataFileLocation();
StaticTableOperations ops = new
StaticTableOperations(metadataFileLocation, io);
@@ -167,19 +132,21 @@ abstract class BaseSparkAction<ThisT, R> implements
Action<ThisT, R> {
}
protected Dataset<Row> buildManifestListDF(Table table) {
- List<String> manifestLists = getManifestListPaths(table.snapshots());
+ List<String> manifestLists =
ReachableFileUtil.manifestListLocations(table);
return spark.createDataset(manifestLists,
Encoders.STRING()).toDF("file_path");
}
- protected Dataset<Row> buildOtherMetadataFileDF(TableOperations ops) {
- List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops);
+ protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
+ List<String> otherMetadataFiles = Lists.newArrayList();
+ otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table,
false));
+ otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
return spark.createDataset(otherMetadataFiles,
Encoders.STRING()).toDF("file_path");
}
- protected Dataset<Row> buildValidMetadataFileDF(Table table, TableOperations
ops) {
+ protected Dataset<Row> buildValidMetadataFileDF(Table table) {
Dataset<Row> manifestDF = buildManifestFileDF(table);
Dataset<Row> manifestListDF = buildManifestListDF(table);
- Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(ops);
+ Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(table);
return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
}