This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new dd4004b  Add FindFiles helper API (#377)
dd4004b is described below

commit dd4004b0c8e6df7174a1f083ff8240d1b6ad7e3f
Author: Ryan Blue <rdb...@users.noreply.github.com>
AuthorDate: Thu Aug 15 09:43:09 2019 -0700

    Add FindFiles helper API (#377)
---
 .../main/java/org/apache/iceberg/FindFiles.java    | 205 +++++++++++++++++++++
 .../java/org/apache/iceberg/ManifestGroup.java     |   4 +-
 .../java/org/apache/iceberg/TableTestBase.java     |   8 +-
 .../java/org/apache/iceberg/TestFindFiles.java     | 163 ++++++++++++++++
 4 files changed, 374 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java 
b/core/src/main/java/org/apache/iceberg/FindFiles.java
new file mode 100644
index 0000000..79459b5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+
+public class FindFiles {
+  private FindFiles() {
+  }
+
+  private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+  public static Builder in(Table table) {
+    return new Builder(table);
+  }
+
+  public static class Builder {
+    private final Table table;
+    private final TableOperations ops;
+    private boolean caseSensitive = true;
+    private Long snapshotId = null;
+    private Expression rowFilter = Expressions.alwaysTrue();
+    private Expression fileFilter = Expressions.alwaysTrue();
+    private Expression partitionFilter = Expressions.alwaysTrue();
+
+    public Builder(Table table) {
+      this.table = table;
+      this.ops = ((HasTableOperations) table).operations();
+    }
+
+    public Builder caseInsensitive() {
+      this.caseSensitive = false;
+      return this;
+    }
+
+    public Builder caseSensitive(boolean findCaseSensitive) {
+      this.caseSensitive = findCaseSensitive;
+      return this;
+    }
+
+    /**
+     * Base results on the given snapshot.
+     *
+     * @param findSnapshotId a snapshot ID
+     * @return this for method chaining
+     */
+    public Builder inSnapshot(long findSnapshotId) {
+      Preconditions.checkArgument(this.snapshotId == null,
+          "Cannot set snapshot multiple times, already set to id=%s", 
findSnapshotId);
+      Preconditions.checkArgument(table.snapshot(findSnapshotId) != null,
+          "Cannot find snapshot for id=%s", findSnapshotId);
+      this.snapshotId = findSnapshotId;
+      return this;
+    }
+
+    /**
+     * Base results on files in the snapshot that was current as of a 
timestamp.
+     *
+     * @param timestampMillis a timestamp in milliseconds
+     * @return this for method chaining
+     */
+    public Builder asOfTime(long timestampMillis) {
+      Preconditions.checkArgument(this.snapshotId == null,
+          "Cannot set snapshot multiple times, already set to id=%s", 
snapshotId);
+
+      Long lastSnapshotId = null;
+      for (HistoryEntry logEntry : ops.current().snapshotLog()) {
+        if (logEntry.timestampMillis() <= timestampMillis) {
+          lastSnapshotId = logEntry.snapshotId();
+        } else {
+          // the last snapshot ID was the last one older than the timestamp
+          break;
+        }
+      }
+
+      // the snapshot ID could be null if no entries were older than the 
requested time. in that
+      // case, there is no valid snapshot to read.
+      Preconditions.checkArgument(lastSnapshotId != null,
+          "Cannot find a snapshot older than %s",
+          
DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis),
 ZoneId.systemDefault())));
+      return inSnapshot(lastSnapshotId);
+    }
+
+    /**
+     * Filter results using a record filter. Files that may contain at least 
one matching record
+     * will be returned by {@link #collect()}.
+     *
+     * @param expr a record filter
+     * @return this for method chaining
+     */
+    public Builder withRecordsMatching(Expression expr) {
+      this.rowFilter = Expressions.and(rowFilter, expr);
+      return this;
+    }
+
+    /**
+     * Filter results using a metadata filter for the data in a {@link 
DataFile}.
+     *
+     * @param expr a filter for {@link DataFile} metadata columns
+     * @return this for method chaining
+     */
+    public Builder withMetadataMatching(Expression expr) {
+      this.fileFilter = Expressions.and(fileFilter, expr);
+      return this;
+    }
+
+    /**
+     * Filter results to files in any one of the given partitions.
+     *
+     * @param spec a spec for the partitions
+     * @param partition a StructLike that stores a partition tuple
+     * @return this for method chaining
+     */
+    public Builder inPartition(PartitionSpec spec, StructLike partition) {
+      return inPartitions(spec, partition);
+    }
+
+    /**
+     * Filter results to files in any one of the given partitions.
+     *
+     * @param spec a spec for the partitions
+     * @param partitions one or more StructLike that stores a partition tuple
+     * @return this for method chaining
+     */
+    public Builder inPartitions(PartitionSpec spec, StructLike... partitions) {
+      return inPartitions(spec, Arrays.asList(partitions));
+    }
+
+    /**
+     * Filter results to files in any one of the given partitions.
+     *
+     * @param spec a spec for the partitions
+     * @param partitions a list of StructLike that stores a partition tuple
+     * @return this for method chaining
+     */
+    public Builder inPartitions(PartitionSpec spec, List<StructLike> 
partitions) {
+      
Preconditions.checkArgument(spec.equals(ops.current().spec(spec.specId())),
+          "Partition spec does not belong to table: %s", table);
+
+      Expression partitionSetFilter = Expressions.alwaysFalse();
+      for (StructLike partitionData : partitions) {
+        Expression partFilter = Expressions.alwaysTrue();
+        for (int i = 0; i < spec.fields().size(); i += 1) {
+          PartitionField field = spec.fields().get(i);
+          partFilter = Expressions.and(
+              partFilter,
+              Expressions.equal(field.name(), partitionData.get(i, 
Object.class)));
+        }
+        partitionSetFilter = Expressions.or(partitionSetFilter, partFilter);
+      }
+
+      if (partitionFilter != Expressions.alwaysTrue()) {
+        this.partitionFilter = Expressions.or(partitionFilter, 
partitionSetFilter);
+      } else {
+        this.partitionFilter = partitionSetFilter;
+      }
+
+      return this;
+    }
+
+    /**
+     * @return all files in the table that match all of the filters
+     */
+    public CloseableIterable<DataFile> collect() {
+      Snapshot snapshot = snapshotId != null ?
+          ops.current().snapshot(snapshotId) : ops.current().currentSnapshot();
+
+      CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, 
snapshot.manifests())
+          .filterData(rowFilter)
+          .filterFiles(fileFilter)
+          .filterPartitions(partitionFilter)
+          .ignoreDeleted()
+          .caseSensitive(caseSensitive)
+          .entries();
+
+      return CloseableIterable.transform(entries, entry -> 
entry.file().copyWithoutStats());
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java 
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 01fdf8b..0b14fa4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -96,7 +96,7 @@ class ManifestGroup {
 
   public ManifestGroup filterPartitions(Expression expr) {
     return new ManifestGroup(
-        ops, manifests, dataFilter, fileFilter, Expressions.and(fileFilter, 
expr),
+        ops, manifests, dataFilter, fileFilter, 
Expressions.and(partitionFilter, expr),
         ignoreDeleted, ignoreExisting, columns, caseSensitive);
   }
 
@@ -139,7 +139,7 @@ class ManifestGroup {
    * @return a CloseableIterable of manifest entries.
    */
   public CloseableIterable<ManifestEntry> entries() {
-    Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), 
fileFilter);
+    Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), 
fileFilter, caseSensitive);
 
     Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests,
         manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index c55c07d..a86481a 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -54,25 +54,25 @@ public class TableTestBase {
       .withPath("/path/to/data-a.parquet")
       .withFileSizeInBytes(0)
       .withPartitionPath("data_bucket=0") // easy way to set partition data 
for now
-      .withRecordCount(0)
+      .withRecordCount(1)
       .build();
   static final DataFile FILE_B = DataFiles.builder(SPEC)
       .withPath("/path/to/data-b.parquet")
       .withFileSizeInBytes(0)
       .withPartitionPath("data_bucket=1") // easy way to set partition data 
for now
-      .withRecordCount(0)
+      .withRecordCount(1)
       .build();
   static final DataFile FILE_C = DataFiles.builder(SPEC)
       .withPath("/path/to/data-c.parquet")
       .withFileSizeInBytes(0)
       .withPartitionPath("data_bucket=2") // easy way to set partition data 
for now
-      .withRecordCount(0)
+      .withRecordCount(1)
       .build();
   static final DataFile FILE_D = DataFiles.builder(SPEC)
       .withPath("/path/to/data-d.parquet")
       .withFileSizeInBytes(0)
       .withPartitionPath("data_bucket=3") // easy way to set partition data 
for now
-      .withRecordCount(0)
+      .withRecordCount(1)
       .build();
 
   @Rule
diff --git a/core/src/test/java/org/apache/iceberg/TestFindFiles.java 
b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
new file mode 100644
index 0000000..98d4482
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFindFiles.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import java.util.Set;
+import org.apache.iceberg.expressions.Expressions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFindFiles extends TableTestBase {
+  @Test
+  public void testBasicBehavior() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Iterable<DataFile> files = FindFiles.in(table).collect();
+
+    Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files));
+  }
+
+  @Test
+  public void testWithMetadataMatching() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+
+    Iterable<DataFile> files = FindFiles.in(table)
+        .withMetadataMatching(Expressions.startsWith("file_path", 
"/path/to/data-a"))
+        .collect();
+
+    Assert.assertEquals(pathSet(FILE_A), pathSet(files));
+  }
+
+  @Test
+  public void testInPartition() {
+    table.newAppend()
+        .appendFile(FILE_A) // bucket 0
+        .appendFile(FILE_B) // bucket 1
+        .appendFile(FILE_C) // bucket 2
+        .appendFile(FILE_D) // bucket 3
+        .commit();
+
+    Iterable<DataFile> files = FindFiles.in(table)
+        .inPartition(table.spec(), StaticDataTask.Row.of(1))
+        .inPartition(table.spec(), StaticDataTask.Row.of(2))
+        .collect();
+
+    Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files));
+  }
+
+  @Test
+  public void testInPartitions() {
+    table.newAppend()
+        .appendFile(FILE_A) // bucket 0
+        .appendFile(FILE_B) // bucket 1
+        .appendFile(FILE_C) // bucket 2
+        .appendFile(FILE_D) // bucket 3
+        .commit();
+
+    Iterable<DataFile> files = FindFiles.in(table)
+        .inPartitions(table.spec(), StaticDataTask.Row.of(1), 
StaticDataTask.Row.of(2))
+        .collect();
+
+    Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files));
+  }
+
+  @Test
+  public void testAsOfTimestamp() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    long timestamp = System.currentTimeMillis();
+
+    table.newAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_D)
+        .commit();
+
+    Iterable<DataFile> files = 
FindFiles.in(table).asOfTime(timestamp).collect();
+
+    Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files));
+  }
+
+  @Test
+  public void testSnapshotId() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    table.newAppend()
+        .appendFile(FILE_C)
+        .commit();
+
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    table.newAppend()
+        .appendFile(FILE_D)
+        .commit();
+
+    Iterable<DataFile> files = 
FindFiles.in(table).inSnapshot(snapshotId).collect();
+
+    Assert.assertEquals(pathSet(FILE_A, FILE_B, FILE_C), pathSet(files));
+  }
+
+  @Test
+  public void testCaseSensitivity() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .commit();
+
+    Iterable<DataFile> files = FindFiles.in(table)
+        .caseInsensitive()
+        .withMetadataMatching(Expressions.startsWith("FILE_PATH", 
"/path/to/data-a"))
+        .collect();
+
+    Assert.assertEquals(pathSet(FILE_A), pathSet(files));
+  }
+
+  private Set<String> pathSet(DataFile... files) {
+    return Sets.newHashSet(Iterables.transform(Arrays.asList(files), file -> 
file.path().toString()));
+  }
+
+  private Set<String> pathSet(Iterable<DataFile> files) {
+    return Sets.newHashSet(Iterables.transform(files, file -> 
file.path().toString()));
+  }
+}

Reply via email to