This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 33d7294  Core: Add ReachableFileUtil (#2564)
33d7294 is described below

commit 33d72945aeb3c0d3e17a341079e72a26fbd03ef8
Author: Karuppayya <[email protected]>
AuthorDate: Mon May 17 15:53:39 2021 -0700

    Core: Add ReachableFileUtil (#2564)
---
 .../java/org/apache/iceberg/ReachableFileUtil.java | 113 +++++++++++++++++
 .../org/apache/iceberg/actions/BaseAction.java     |   3 +-
 .../iceberg/hadoop/HadoopTableOperations.java      |   2 +-
 .../main/java/org/apache/iceberg/hadoop/Util.java  |   3 +
 .../apache/iceberg/util/TestReachableFileUtil.java | 135 +++++++++++++++++++++
 .../actions/BaseRemoveOrphanFilesSparkAction.java  |   6 +-
 .../iceberg/spark/actions/BaseSparkAction.java     |  49 ++------
 7 files changed, 263 insertions(+), 48 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java 
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
new file mode 100644
index 0000000..0197b4a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReachableFileUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReachableFileUtil.class);
+
+  private ReachableFileUtil() {
+  }
+
+  /**
+   * Returns the location of the version hint file
+   *
+   * @param table table for which version hint file's path needs to be 
retrieved
+   * @return the location of the version hint file
+   */
+  public static String versionHintLocation(Table table) {
+    TableOperations ops = ((HasTableOperations) table).operations();
+    return ops.metadataFileLocation(Util.VERSION_HINT_FILENAME);
+  }
+
+  /**
+   * Returns locations of JSON metadata files in a table.
+   *
+   * @param table     Table to get JSON metadata files from
+   * @param recursive When true, recursively retrieves all the reachable JSON 
metadata files.
+   *                  When false, gets the all the JSON metadata files only 
from the current metadata.
+   * @return locations of JSON metadata files
+   */
+  public static Set<String> metadataFileLocations(Table table, boolean 
recursive) {
+    Set<String> metadataFileLocations = Sets.newHashSet();
+    TableOperations ops = ((HasTableOperations) table).operations();
+    TableMetadata tableMetadata = ops.current();
+    metadataFileLocations.add(tableMetadata.metadataFileLocation());
+    metadataFileLocations(tableMetadata, metadataFileLocations, ops.io(), 
recursive);
+    return metadataFileLocations;
+  }
+
+  private static void metadataFileLocations(TableMetadata metadata, 
Set<String> metadataFileLocations,
+                                            FileIO io, boolean recursive) {
+    List<MetadataLogEntry> metadataLogEntries = metadata.previousFiles();
+    if (metadataLogEntries.size() > 0) {
+      for (MetadataLogEntry metadataLogEntry : metadataLogEntries) {
+        metadataFileLocations.add(metadataLogEntry.file());
+      }
+      if (recursive) {
+        TableMetadata previousMetadata = 
findFirstExistentPreviousMetadata(metadataLogEntries, io);
+        if (previousMetadata != null) {
+          metadataFileLocations(previousMetadata, metadataFileLocations, io, 
recursive);
+        }
+      }
+    }
+  }
+
+  private static TableMetadata 
findFirstExistentPreviousMetadata(List<MetadataLogEntry> metadataLogEntries, 
FileIO io) {
+    TableMetadata metadata = null;
+    for (MetadataLogEntry metadataLogEntry : metadataLogEntries) {
+      try {
+        metadata = TableMetadataParser.read(io, metadataLogEntry.file());
+        break;
+      } catch (Exception e) {
+        LOG.error("Failed to load {}", metadataLogEntry, e);
+      }
+    }
+    return metadata;
+  }
+
+  /**
+   * Returns locations of manifest lists in a table.
+   *
+   * @param table table for which manifestList needs to be fetched
+   * @return the location of manifest Lists
+   */
+  public static List<String> manifestListLocations(Table table) {
+    Iterable<Snapshot> snapshots = table.snapshots();
+    List<String> manifestListLocations = Lists.newArrayList();
+    for (Snapshot snapshot : snapshots) {
+      String manifestListLocation = snapshot.manifestListLocation();
+      if (manifestListLocation != null) {
+        manifestListLocations.add(manifestListLocation);
+      }
+    }
+    return manifestListLocations;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java 
b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
index 4479bcc..387791f 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 abstract class BaseAction<ThisT, R> implements Action<ThisT, R> {
@@ -76,7 +77,7 @@ abstract class BaseAction<ThisT, R> implements Action<ThisT, 
R> {
    */
   protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
     List<String> otherMetadataFiles = Lists.newArrayList();
-    otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
+    
otherMetadataFiles.add(ops.metadataFileLocation(Util.VERSION_HINT_FILENAME));
 
     TableMetadata metadata = ops.current();
     otherMetadataFiles.add(metadata.metadataFileLocation());
diff --git 
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index dcbd57f..496f95f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -285,7 +285,7 @@ public class HadoopTableOperations implements 
TableOperations {
 
   @VisibleForTesting
   Path versionHintFile() {
-    return metadataPath("version-hint.text");
+    return metadataPath(Util.VERSION_HINT_FILENAME);
   }
 
   private void writeVersionHint(int versionToWrite) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/Util.java 
b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
index bdb334b..7fc55d2 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/Util.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/Util.java
@@ -38,6 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Util {
+
+  public static final String VERSION_HINT_FILENAME = "version-hint.text";
+
   private static final Logger LOG = LoggerFactory.getLogger(Util.class);
 
   private Util() {
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java
new file mode 100644
index 0000000..257d25e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestReachableFileUtil.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.io.File;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestReachableFileUtil {
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "c1", Types.IntegerType.get()),
+      optional(2, "c2", Types.StringType.get())
+  );
+
+  private static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
+  private static final DataFile FILE_A = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-a.parquet")
+      .withFileSizeInBytes(10)
+      .withRecordCount(1)
+      .build();
+  private static final DataFile FILE_B = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-b.parquet")
+      .withFileSizeInBytes(10)
+      .withRecordCount(1)
+      .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private Table table;
+
+  @Before
+  public void setupTableLocation() throws Exception {
+    File tableDir = temp.newFolder();
+    String tableLocation = tableDir.toURI().toString();
+    this.table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation);
+  }
+
+  @Test
+  public void testManifestListLocations() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    List<String> manifestListPaths = 
ReachableFileUtil.manifestListLocations(table);
+    Assert.assertEquals(manifestListPaths.size(), 2);
+  }
+
+  @Test
+  public void testMetadataFileLocations() {
+    table.updateProperties()
+        .set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "1")
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    Set<String> metadataFileLocations = 
ReachableFileUtil.metadataFileLocations(table, true);
+    Assert.assertEquals(metadataFileLocations.size(), 4);
+
+    metadataFileLocations = ReachableFileUtil.metadataFileLocations(table, 
false);
+    Assert.assertEquals(metadataFileLocations.size(), 2);
+  }
+
+  @Test
+  public void testMetadataFileLocationsWithMissingFiles() {
+    table.updateProperties()
+        .set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, "1")
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    TableOperations operations = ((HasTableOperations) table).operations();
+    String location = operations.current().metadataFileLocation();
+    table.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    // delete v3.metadata.json making v2.metadata.json and v1.metadata.json 
inaccessible
+    table.io().deleteFile(location);
+
+    Set<String> metadataFileLocations = 
ReachableFileUtil.metadataFileLocations(table, true);
+    Assert.assertEquals(metadataFileLocations.size(), 2);
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
 
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
index 8454e0d..844dfd0 100644
--- 
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRemoveOrphanFilesSparkAction.java
@@ -30,9 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.actions.BaseRemoveOrphanFilesActionResult;
 import org.apache.iceberg.actions.RemoveOrphanFiles;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -91,7 +89,6 @@ public class BaseRemoveOrphanFilesSparkAction
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
-  private final TableOperations ops;
 
   private String location = null;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
@@ -108,7 +105,6 @@ public class BaseRemoveOrphanFilesSparkAction
     this.hadoopConf = new 
SerializableConfiguration(spark.sessionState().newHadoopConf());
     this.partitionDiscoveryParallelism = 
spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
     this.table = table;
-    this.ops = ((HasTableOperations) table).operations();
     this.location = table.location();
 
     ValidationException.check(
@@ -156,7 +152,7 @@ public class BaseRemoveOrphanFilesSparkAction
 
   private RemoveOrphanFiles.Result doExecute() {
     Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
-    Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table, ops);
+    Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
     Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = buildActualFileDF();
 
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java 
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index e760a6d..fe7d980 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -27,11 +27,10 @@ import java.util.function.Supplier;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.actions.Action;
 import org.apache.iceberg.actions.ManifestFileBean;
 import org.apache.iceberg.common.DynMethods;
@@ -109,40 +108,6 @@ abstract class BaseSparkAction<ThisT, R> implements 
Action<ThisT, R> {
     return new JobGroupInfo(groupId + "-" + JOB_COUNTER.incrementAndGet(), 
desc, false);
   }
 
-  /**
-   * Returns all the path locations of all Manifest Lists for a given list of 
snapshots
-   * @param snapshots snapshots
-   * @return the paths of the Manifest Lists
-   */
-  private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
-    List<String> manifestLists = Lists.newArrayList();
-    for (Snapshot snapshot : snapshots) {
-      String manifestListLocation = snapshot.manifestListLocation();
-      if (manifestListLocation != null) {
-        manifestLists.add(manifestListLocation);
-      }
-    }
-    return manifestLists;
-  }
-
-  /**
-   * Returns all Metadata file paths which may not be in the current metadata. 
Specifically
-   * this includes "version-hint" files as well as entries in 
metadata.previousFiles.
-   * @param ops TableOperations for the table we will be getting paths from
-   * @return a list of paths to metadata files
-   */
-  private List<String> getOtherMetadataFilePaths(TableOperations ops) {
-    List<String> otherMetadataFiles = Lists.newArrayList();
-    otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
-
-    TableMetadata metadata = ops.current();
-    otherMetadataFiles.add(metadata.metadataFileLocation());
-    for (TableMetadata.MetadataLogEntry previousMetadataFile : 
metadata.previousFiles()) {
-      otherMetadataFiles.add(previousMetadataFile.file());
-    }
-    return otherMetadataFiles;
-  }
-
   protected Table newStaticTable(TableMetadata metadata, FileIO io) {
     String metadataFileLocation = metadata.metadataFileLocation();
     StaticTableOperations ops = new 
StaticTableOperations(metadataFileLocation, io);
@@ -167,19 +132,21 @@ abstract class BaseSparkAction<ThisT, R> implements 
Action<ThisT, R> {
   }
 
   protected Dataset<Row> buildManifestListDF(Table table) {
-    List<String> manifestLists = getManifestListPaths(table.snapshots());
+    List<String> manifestLists = 
ReachableFileUtil.manifestListLocations(table);
     return spark.createDataset(manifestLists, 
Encoders.STRING()).toDF("file_path");
   }
 
-  protected Dataset<Row> buildOtherMetadataFileDF(TableOperations ops) {
-    List<String> otherMetadataFiles = getOtherMetadataFilePaths(ops);
+  protected Dataset<Row> buildOtherMetadataFileDF(Table table) {
+    List<String> otherMetadataFiles = Lists.newArrayList();
+    otherMetadataFiles.addAll(ReachableFileUtil.metadataFileLocations(table, 
false));
+    otherMetadataFiles.add(ReachableFileUtil.versionHintLocation(table));
     return spark.createDataset(otherMetadataFiles, 
Encoders.STRING()).toDF("file_path");
   }
 
-  protected Dataset<Row> buildValidMetadataFileDF(Table table, TableOperations 
ops) {
+  protected Dataset<Row> buildValidMetadataFileDF(Table table) {
     Dataset<Row> manifestDF = buildManifestFileDF(table);
     Dataset<Row> manifestListDF = buildManifestListDF(table);
-    Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(ops);
+    Dataset<Row> otherMetadataFileDF = buildOtherMetadataFileDF(table);
 
     return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
   }

Reply via email to