Repository: parquet-mr Updated Branches: refs/heads/master 5294c64b3 -> 5a45ae3b1
PARQUET-241: Fix ParquetInputFormat.getFooters() order ParquetInputFormat.getFooters() should return in the same order as what listStatus() returns Author: Mingyu Kim <[email protected]> Closes #164 from mingyukim/parquet-241 and squashes the following commits: 86fe900 [Mingyu Kim] Address PR comments b0181e2 [Mingyu Kim] PARQUET-241: ParquetInputFormat.getFooters() should return in the same order as what listStatus() returns Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/5a45ae3b Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/5a45ae3b Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/5a45ae3b Branch: refs/heads/master Commit: 5a45ae3b1deb5117cb9e9a13141eeab1e9ad3d71 Parents: 5294c64 Author: Mingyu Kim <[email protected]> Authored: Thu Oct 29 15:42:43 2015 -0700 Committer: Ryan Blue <[email protected]> Committed: Thu Oct 29 15:42:43 2015 -0700 ---------------------------------------------------------------------- .../parquet/hadoop/ParquetInputFormat.java | 43 ++++++++----- .../apache/parquet/hadoop/TestInputFormat.java | 65 +++++++++++++++++++- 2 files changed, 92 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5a45ae3b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 4848f22..e3536d7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -29,8 +29,10 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configurable; @@ -389,7 +391,9 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> { return Collections.emptyList(); } Configuration config = ContextUtil.getConfiguration(jobContext); - List<Footer> footers = new ArrayList<Footer>(statuses.size()); + // Use LinkedHashMap to preserve the insertion order and ultimately to return the list of + // footers in the same order as the list of file statuses returned from listStatus() + Map<FileStatusWrapper, Footer> footersMap = new LinkedHashMap<FileStatusWrapper, Footer>(); Set<FileStatus> missingStatuses = new HashSet<FileStatus>(); Map<Path, FileStatusWrapper> missingStatusesMap = new HashMap<Path, FileStatusWrapper>(missingStatuses.size()); @@ -407,33 +411,42 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> { + " found for '" + status.getPath() + "'"); } if (cacheEntry != null) { - footers.add(cacheEntry.getFooter()); + footersMap.put(statusWrapper, cacheEntry.getFooter()); } else { + footersMap.put(statusWrapper, null); missingStatuses.add(status); missingStatusesMap.put(status.getPath(), statusWrapper); } } if (Log.DEBUG) { - LOG.debug("found " + footers.size() + " footers in cache and adding up " + LOG.debug("found " + footersMap.size() + " footers in cache and adding up " + "to " + missingStatuses.size() + " missing footers to the cache"); } - - if (missingStatuses.isEmpty()) { - return footers; + if (!missingStatuses.isEmpty()) { + List<Footer> newFooters = getFooters(config, missingStatuses); + for (Footer newFooter : newFooters) { + // Use the original file status objects to make sure we store a + // conservative (older) modification time (i.e. in case the files and + // footers were modified and it's not clear which version of the footers + // we have) + FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile()); + footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter)); + } } - List<Footer> newFooters = getFooters(config, missingStatuses); - for (Footer newFooter : newFooters) { - // Use the original file status objects to make sure we store a - // conservative (older) modification time (i.e. in case the files and - // footers were modified and it's not clear which version of the footers - // we have) - FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile()); - footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter)); + List<Footer> footers = new ArrayList<Footer>(statuses.size()); + for (Entry<FileStatusWrapper, Footer> footerEntry : footersMap.entrySet()) { + Footer footer = footerEntry.getValue(); + + if (footer == null) { + // Footer was originally missing, so get it from the cache again + footers.add(footersCache.getCurrentValue(footerEntry.getKey()).getFooter()); + } else { + footers.add(footer); + } } - footers.addAll(newFooters); return footers; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/5a45ae3b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java index 6d89ef2..9fe3008 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.apache.parquet.column.Encoding.BIT_PACKED; +import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.filter2.predicate.FilterApi.and; import static org.apache.parquet.filter2.predicate.FilterApi.eq; import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; @@ -49,11 +51,13 @@ import org.apache.hadoop.mapreduce.Job; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnReader; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.IntStatistics; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.filter.RecordFilter; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; @@ -63,6 +67,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -71,6 +76,8 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import com.google.common.io.Files; + public class TestInputFormat { List<BlockMetaData> blocks; @@ -378,6 +385,62 @@ public class TestInputFormat { shouldSplitStartBe(splits, 0, 50); } + @Test + public void testGetFootersReturnsInPredictableOrder() throws IOException { + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + int numFiles = 10; // create a nontrivial number of files so that it actually tests getFooters() returns files in the correct order + + String url = ""; + for (int i = 0; i < numFiles; i++) { + File file = new File(tempDir, String.format("part-%05d.parquet", i)); + createParquetFile(file); + if (i > 0) { + url += ","; + } + url += "file:" + file.getAbsolutePath(); + } + + Job job = new Job(); + FileInputFormat.setInputPaths(job, url); + List<Footer> footers = new ParquetInputFormat<Object>().getFooters(job); + for (int i = 0; i < numFiles; i++) { + Footer footer = footers.get(i); + File file = new File(tempDir, String.format("part-%05d.parquet", i)); + assertEquals("file:" + file.getAbsolutePath(), footer.getFile().toString()); + } + } + + private void createParquetFile(File file) throws IOException { + Path path = new Path(file.toURI()); + Configuration configuration = new Configuration(); + + MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;}}"); + String[] columnPath = {"a", "b"}; + ColumnDescriptor c1 = schema.getColumnDescription(columnPath); + + byte[] bytes1 = { 0, 1, 2, 3}; + byte[] bytes2 = { 2, 3, 4, 5}; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + BinaryStatistics stats = new BinaryStatistics(); + + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + w.startBlock(3); + w.startColumn(c1, 5, codec); + w.writeDataPage(2, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.startBlock(4); + w.startColumn(c1, 7, codec); + w.writeDataPage(7, 4, BytesInput.from(bytes2), stats, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap<String, String>()); + } + private File getTempFile() throws IOException { File tempFile = File.createTempFile("footer_", ".txt"); tempFile.deleteOnExit();
