This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push: new dd4004b Add FindFiles helper API (#377) dd4004b is described below commit dd4004b0c8e6df7174a1f083ff8240d1b6ad7e3f Author: Ryan Blue <rdb...@users.noreply.github.com> AuthorDate: Thu Aug 15 09:43:09 2019 -0700 Add FindFiles helper API (#377) --- .../main/java/org/apache/iceberg/FindFiles.java | 205 +++++++++++++++++++++ .../java/org/apache/iceberg/ManifestGroup.java | 4 +- .../java/org/apache/iceberg/TableTestBase.java | 8 +- .../java/org/apache/iceberg/TestFindFiles.java | 163 ++++++++++++++++ 4 files changed, 374 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java new file mode 100644 index 0000000..79459b5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/FindFiles.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; + +public class FindFiles { + private FindFiles() { + } + + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + + public static Builder in(Table table) { + return new Builder(table); + } + + public static class Builder { + private final Table table; + private final TableOperations ops; + private boolean caseSensitive = true; + private Long snapshotId = null; + private Expression rowFilter = Expressions.alwaysTrue(); + private Expression fileFilter = Expressions.alwaysTrue(); + private Expression partitionFilter = Expressions.alwaysTrue(); + + public Builder(Table table) { + this.table = table; + this.ops = ((HasTableOperations) table).operations(); + } + + public Builder caseInsensitive() { + this.caseSensitive = false; + return this; + } + + public Builder caseSensitive(boolean findCaseSensitive) { + this.caseSensitive = findCaseSensitive; + return this; + } + + /** + * Base results on the given snapshot. + * + * @param findSnapshotId a snapshot ID + * @return this for method chaining + */ + public Builder inSnapshot(long findSnapshotId) { + Preconditions.checkArgument(this.snapshotId == null, + "Cannot set snapshot multiple times, already set to id=%s", findSnapshotId); + Preconditions.checkArgument(table.snapshot(findSnapshotId) != null, + "Cannot find snapshot for id=%s", findSnapshotId); + this.snapshotId = findSnapshotId; + return this; + } + + /** + * Base results on files in the snapshot that was current as of a timestamp. + * + * @param timestampMillis a timestamp in milliseconds + * @return this for method chaining + */ + public Builder asOfTime(long timestampMillis) { + Preconditions.checkArgument(this.snapshotId == null, + "Cannot set snapshot multiple times, already set to id=%s", snapshotId); + + Long lastSnapshotId = null; + for (HistoryEntry logEntry : ops.current().snapshotLog()) { + if (logEntry.timestampMillis() <= timestampMillis) { + lastSnapshotId = logEntry.snapshotId(); + } else { + // the last snapshot ID was the last one older than the timestamp + break; + } + } + + // the snapshot ID could be null if no entries were older than the requested time. in that + // case, there is no valid snapshot to read. + Preconditions.checkArgument(lastSnapshotId != null, + "Cannot find a snapshot older than %s", + DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault()))); + return inSnapshot(lastSnapshotId); + } + + /** + * Filter results using a record filter. Files that may contain at least one matching record + * will be returned by {@link #collect()}. + * + * @param expr a record filter + * @return this for method chaining + */ + public Builder withRecordsMatching(Expression expr) { + this.rowFilter = Expressions.and(rowFilter, expr); + return this; + } + + /** + * Filter results using a metadata filter for the data in a {@link DataFile}. + * + * @param expr a filter for {@link DataFile} metadata columns + * @return this for method chaining + */ + public Builder withMetadataMatching(Expression expr) { + this.fileFilter = Expressions.and(fileFilter, expr); + return this; + } + + /** + * Filter results to files in any one of the given partitions. + * + * @param spec a spec for the partitions + * @param partition a StructLike that stores a partition tuple + * @return this for method chaining + */ + public Builder inPartition(PartitionSpec spec, StructLike partition) { + return inPartitions(spec, partition); + } + + /** + * Filter results to files in any one of the given partitions. + * + * @param spec a spec for the partitions + * @param partitions one or more StructLike that stores a partition tuple + * @return this for method chaining + */ + public Builder inPartitions(PartitionSpec spec, StructLike... partitions) { + return inPartitions(spec, Arrays.asList(partitions)); + } + + /** + * Filter results to files in any one of the given partitions. + * + * @param spec a spec for the partitions + * @param partitions a list of StructLike that stores a partition tuple + * @return this for method chaining + */ + public Builder inPartitions(PartitionSpec spec, List<StructLike> partitions) { + Preconditions.checkArgument(spec.equals(ops.current().spec(spec.specId())), + "Partition spec does not belong to table: %s", table); + + Expression partitionSetFilter = Expressions.alwaysFalse(); + for (StructLike partitionData : partitions) { + Expression partFilter = Expressions.alwaysTrue(); + for (int i = 0; i < spec.fields().size(); i += 1) { + PartitionField field = spec.fields().get(i); + partFilter = Expressions.and( + partFilter, + Expressions.equal(field.name(), partitionData.get(i, Object.class))); + } + partitionSetFilter = Expressions.or(partitionSetFilter, partFilter); + } + + if (partitionFilter != Expressions.alwaysTrue()) { + this.partitionFilter = Expressions.or(partitionFilter, partitionSetFilter); + } else { + this.partitionFilter = partitionSetFilter; + } + + return this; + } + + /** + * @return all files in the table that match all of the filters + */ + public CloseableIterable<DataFile> collect() { + Snapshot snapshot = snapshotId != null ? + ops.current().snapshot(snapshotId) : ops.current().currentSnapshot(); + + CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, snapshot.manifests()) + .filterData(rowFilter) + .filterFiles(fileFilter) + .filterPartitions(partitionFilter) + .ignoreDeleted() + .caseSensitive(caseSensitive) + .entries(); + + return CloseableIterable.transform(entries, entry -> entry.file().copyWithoutStats()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 01fdf8b..0b14fa4 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -96,7 +96,7 @@ class ManifestGroup { public ManifestGroup filterPartitions(Expression expr) { return new ManifestGroup( - ops, manifests, dataFilter, fileFilter, Expressions.and(fileFilter, expr), + ops, manifests, dataFilter, fileFilter, Expressions.and(partitionFilter, expr), ignoreDeleted, ignoreExisting, columns, caseSensitive); } @@ -139,7 +139,7 @@ class ManifestGroup { * @return a CloseableIterable of manifest entries. */ public CloseableIterable<ManifestEntry> entries() { - Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter); + Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive); Iterable<ManifestFile> matchingManifests = Iterables.filter(manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c55c07d..a86481a 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -54,25 +54,25 @@ public class TableTestBase { .withPath("/path/to/data-a.parquet") .withFileSizeInBytes(0) .withPartitionPath("data_bucket=0") // easy way to set partition data for now - .withRecordCount(0) + .withRecordCount(1) .build(); static final DataFile FILE_B = DataFiles.builder(SPEC) .withPath("/path/to/data-b.parquet") .withFileSizeInBytes(0) .withPartitionPath("data_bucket=1") // easy way to set partition data for now - .withRecordCount(0) + .withRecordCount(1) .build(); static final DataFile FILE_C = DataFiles.builder(SPEC) .withPath("/path/to/data-c.parquet") .withFileSizeInBytes(0) .withPartitionPath("data_bucket=2") // easy way to set partition data for now - .withRecordCount(0) + .withRecordCount(1) .build(); static final DataFile FILE_D = DataFiles.builder(SPEC) .withPath("/path/to/data-d.parquet") .withFileSizeInBytes(0) .withPartitionPath("data_bucket=3") // easy way to set partition data for now - .withRecordCount(0) + .withRecordCount(1) .build(); @Rule diff --git a/core/src/test/java/org/apache/iceberg/TestFindFiles.java b/core/src/test/java/org/apache/iceberg/TestFindFiles.java new file mode 100644 index 0000000..98d4482 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestFindFiles.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.Set; +import org.apache.iceberg.expressions.Expressions; +import org.junit.Assert; +import org.junit.Test; + +public class TestFindFiles extends TableTestBase { + @Test + public void testBasicBehavior() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + Iterable<DataFile> files = FindFiles.in(table).collect(); + + Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files)); + } + + @Test + public void testWithMetadataMatching() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .appendFile(FILE_C) + .appendFile(FILE_D) + .commit(); + + Iterable<DataFile> files = FindFiles.in(table) + .withMetadataMatching(Expressions.startsWith("file_path", "/path/to/data-a")) + .collect(); + + Assert.assertEquals(pathSet(FILE_A), pathSet(files)); + } + + @Test + public void testInPartition() { + table.newAppend() + .appendFile(FILE_A) // bucket 0 + .appendFile(FILE_B) // bucket 1 + .appendFile(FILE_C) // bucket 2 + .appendFile(FILE_D) // bucket 3 + .commit(); + + Iterable<DataFile> files = FindFiles.in(table) + .inPartition(table.spec(), StaticDataTask.Row.of(1)) + .inPartition(table.spec(), StaticDataTask.Row.of(2)) + .collect(); + + Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files)); + } + + @Test + public void testInPartitions() { + table.newAppend() + .appendFile(FILE_A) // bucket 0 + .appendFile(FILE_B) // bucket 1 + .appendFile(FILE_C) // bucket 2 + .appendFile(FILE_D) // bucket 3 + .commit(); + + Iterable<DataFile> files = FindFiles.in(table) + .inPartitions(table.spec(), StaticDataTask.Row.of(1), StaticDataTask.Row.of(2)) + .collect(); + + Assert.assertEquals(pathSet(FILE_B, FILE_C), pathSet(files)); + } + + @Test + public void testAsOfTimestamp() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + table.newAppend() + .appendFile(FILE_B) + .commit(); + + long timestamp = System.currentTimeMillis(); + + table.newAppend() + .appendFile(FILE_C) + .commit(); + + table.newAppend() + .appendFile(FILE_D) + .commit(); + + Iterable<DataFile> files = FindFiles.in(table).asOfTime(timestamp).collect(); + + Assert.assertEquals(pathSet(FILE_A, FILE_B), pathSet(files)); + } + + @Test + public void testSnapshotId() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + table.newAppend() + .appendFile(FILE_C) + .commit(); + + long snapshotId = table.currentSnapshot().snapshotId(); + + table.newAppend() + .appendFile(FILE_D) + .commit(); + + Iterable<DataFile> files = FindFiles.in(table).inSnapshot(snapshotId).collect(); + + Assert.assertEquals(pathSet(FILE_A, FILE_B, FILE_C), pathSet(files)); + } + + @Test + public void testCaseSensitivity() { + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .appendFile(FILE_C) + .appendFile(FILE_D) + .commit(); + + Iterable<DataFile> files = FindFiles.in(table) + .caseInsensitive() + .withMetadataMatching(Expressions.startsWith("FILE_PATH", "/path/to/data-a")) + .collect(); + + Assert.assertEquals(pathSet(FILE_A), pathSet(files)); + } + + private Set<String> pathSet(DataFile... files) { + return Sets.newHashSet(Iterables.transform(Arrays.asList(files), file -> file.path().toString())); + } + + private Set<String> pathSet(Iterable<DataFile> files) { + return Sets.newHashSet(Iterables.transform(files, file -> file.path().toString())); + } +}