Repository: parquet-cpp Updated Branches: refs/heads/master 1c673ed5b -> 378f335c1
PARQUET-807: Allow user to retain ownership of parquet::FileMetaData. Also implements PARQUET-808: opening file with existing metadata object. This allows a user to create a reader only for the purposes of obtaining the metadata. Do you all think it's worth having a convenience method for reading the metadata out of a file? Author: Wes McKinney <[email protected]> Closes #213 from wesm/PARQUET-807 and squashes the following commits: c1b5c7c [Wes McKinney] Use ReadMetaData function in test d382cca [Wes McKinney] Add note about ARROW-455 05ecd37 [Wes McKinney] Implement/test opening with provided metadata. Do not close Arrow output files automatically d790bb5 [Wes McKinney] Tweak 0dd4184 [Wes McKinney] Add ReadMetaData convenience method 97527ba [Wes McKinney] Change FileMetaData in ParquetFileReader to a shared_ptr so that ownership can be transferred away Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/378f335c Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/378f335c Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/378f335c Branch: refs/heads/master Commit: 378f335c178d318b8c0f787eea8fb5ee406441c2 Parents: 1c673ed Author: Wes McKinney <[email protected]> Authored: Thu Jan 5 12:19:31 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Thu Jan 5 12:19:31 2017 -0500 ---------------------------------------------------------------------- examples/reader-writer.cc | 2 +- src/parquet/arrow/arrow-reader-writer-test.cc | 13 ++++++-- src/parquet/arrow/schema.cc | 6 ---- src/parquet/file/metadata.cc | 5 +-- src/parquet/file/metadata.h | 2 +- src/parquet/file/reader-internal.cc | 15 ++++++--- src/parquet/file/reader-internal.h | 18 +++++----- src/parquet/file/reader.cc | 28 +++++++++++----- src/parquet/file/reader.h | 25 ++++++++------ src/parquet/reader-test.cc | 39 ++++++++++++++++++++-- src/parquet/util/memory.cc | 4 +-- src/parquet/util/memory.h | 2 ++ 12 files changed, 108 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/examples/reader-writer.cc ---------------------------------------------------------------------- diff --git a/examples/reader-writer.cc b/examples/reader-writer.cc index 0289eed..59ee63b 100644 --- a/examples/reader-writer.cc +++ b/examples/reader-writer.cc @@ -226,7 +226,7 @@ int main(int argc, char** argv) { std::unique_ptr<parquet::ParquetFileReader> parquet_reader = parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false); // Get the File MetaData - const parquet::FileMetaData* file_metadata = parquet_reader->metadata(); + std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); // Get the number of RowGroups int num_row_groups = file_metadata->num_row_groups(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 07ddd91..a329480 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -353,9 +353,16 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); auto buffer = std::make_shared<::arrow::PoolBuffer>(); - auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties())); + + { + // BufferOutputStream closed on gc + auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), arrow_sink_, + 512, default_writer_properties())); + + // XXX: Remove this after ARROW-455 completed + ASSERT_OK(arrow_sink_->Close()); + } auto pbuffer = std::make_shared<Buffer>(buffer->data(), buffer->size()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 8b2a2ab..b086b9e 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -364,12 +364,6 @@ Status FieldToNode(const std::shared_ptr<Field>& field, type = ParquetType::INT64; logical_type = LogicalType::TIMESTAMP_MILLIS; } break; - case ArrowType::TIMESTAMP_DOUBLE: - type = ParquetType::INT64; - // This is specified as seconds since the UNIX epoch - // TODO: Converted type in Parquet? - // logical_type = LogicalType::TIMESTAMP_MILLIS; - break; case ArrowType::TIME: type = ParquetType::INT64; logical_type = LogicalType::TIME_MILLIS; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 692a0f5..2e22649 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -323,9 +323,10 @@ class FileMetaData::FileMetaDataImpl { FileMetaData::Version writer_version_; }; -std::unique_ptr<FileMetaData> FileMetaData::Make( +std::shared_ptr<FileMetaData> FileMetaData::Make( const uint8_t* metadata, uint32_t* metadata_len) { - return std::unique_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len)); + // This FileMetaData ctor is private, not compatible with std::make_shared + return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len)); } FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index ef19c98..43419d2 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -120,7 +120,7 @@ class PARQUET_EXPORT FileMetaData { }; // API convenience to get a MetaData accessor - static std::unique_ptr<FileMetaData> Make( + static std::shared_ptr<FileMetaData> Make( const uint8_t* serialized_metadata, uint32_t* metadata_len); ~FileMetaData(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 2c3ebb3..9608f58 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -206,15 +206,20 @@ static constexpr uint32_t FOOTER_SIZE = 8; static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open( - std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) { + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, + const std::shared_ptr<FileMetaData>& metadata) { std::unique_ptr<ParquetFileReader::Contents> result( new SerializedFile(std::move(source), props)); // Access private methods here, but otherwise unavailable SerializedFile* file = static_cast<SerializedFile*>(result.get()); - // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor - file->ParseMetaData(); + if (metadata == nullptr) { + // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor + file->ParseMetaData(); + } else { + file->file_metadata_ = metadata; + } return result; } @@ -233,8 +238,8 @@ std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) { return std::make_shared<RowGroupReader>(std::move(contents)); } -const FileMetaData* SerializedFile::metadata() const { - return file_metadata_.get(); +std::shared_ptr<FileMetaData> SerializedFile::metadata() const { + return file_metadata_; } SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source, http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index aa9b75e..7b21c2e 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -98,16 +98,16 @@ class SerializedRowGroup : public RowGroupReader::Contents { class SerializedFile : public ParquetFileReader::Contents { public: - // Open the valid and validate the header, footer, and parse the Thrift metadata - // - // This class does _not_ take ownership of the data source. You must manage its - // lifetime separately + // Open the file. If no metadata is passed, it is parsed from the footer of + // the file static std::unique_ptr<ParquetFileReader::Contents> Open( std::unique_ptr<RandomAccessSource> source, - const ReaderProperties& props = default_reader_properties()); - virtual void Close(); - virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i); - virtual const FileMetaData* metadata() const; + const ReaderProperties& props = default_reader_properties(), + const std::shared_ptr<FileMetaData>& metadata = nullptr); + + void Close() override; + std::shared_ptr<RowGroupReader> GetRowGroup(int i) override; + std::shared_ptr<FileMetaData> metadata() const override; virtual ~SerializedFile(); private: @@ -116,7 +116,7 @@ class SerializedFile : public ParquetFileReader::Contents { std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props); std::unique_ptr<RandomAccessSource> source_; - std::unique_ptr<FileMetaData> file_metadata_; + std::shared_ptr<FileMetaData> file_metadata_; ReaderProperties properties_; void ParseMetaData(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 52fe57a..d5b2dce 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -72,21 +72,23 @@ ParquetFileReader::~ParquetFileReader() { std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, - const ReaderProperties& props) { + const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) { std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source)); - return Open(std::move(io_wrapper), props); + return Open(std::move(io_wrapper), props, metadata); } std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( - std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) { - auto contents = SerializedFile::Open(std::move(source), props); + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, + const std::shared_ptr<FileMetaData>& metadata) { + auto contents = SerializedFile::Open(std::move(source), props, metadata); std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); result->Open(std::move(contents)); return result; } -std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( - const std::string& path, bool memory_map, const ReaderProperties& props) { +std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path, + bool memory_map, const ReaderProperties& props, + const std::shared_ptr<FileMetaData>& metadata) { std::shared_ptr<::arrow::io::ReadableFileInterface> source; if (memory_map) { std::shared_ptr<::arrow::io::ReadableFile> handle; @@ -100,7 +102,7 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( source = handle; } - return Open(source, props); + return Open(source, props, metadata); } void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { @@ -111,7 +113,7 @@ void ParquetFileReader::Close() { if (contents_) { contents_->Close(); } } -const FileMetaData* ParquetFileReader::metadata() const { +std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const { return contents_->metadata(); } @@ -130,7 +132,7 @@ std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) { void ParquetFileReader::DebugPrint( std::ostream& stream, std::list<int> selected_columns, bool print_values) { - const FileMetaData* file_metadata = metadata(); + const FileMetaData* file_metadata = metadata().get(); stream << "File statistics:\n"; stream << "Version: " << file_metadata->version() << "\n"; @@ -236,4 +238,12 @@ void ParquetFileReader::DebugPrint( } } +// ---------------------------------------------------------------------- +// File metadata helpers + +std::shared_ptr<FileMetaData> ReadMetaData( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) { + return ParquetFileReader::Open(source)->metadata(); +} + } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/file/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index 1c24506..723595a 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -73,7 +73,7 @@ class PARQUET_EXPORT ParquetFileReader { // Perform any cleanup associated with the file contents virtual void Close() = 0; virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0; - virtual const FileMetaData* metadata() const = 0; + virtual std::shared_ptr<FileMetaData> metadata() const = 0; }; ParquetFileReader(); @@ -86,20 +86,21 @@ class PARQUET_EXPORT ParquetFileReader { // subclass of RandomAccessSource that wraps the shared resource static std::unique_ptr<ParquetFileReader> Open( std::unique_ptr<RandomAccessSource> source, - const ReaderProperties& props = default_reader_properties()); + const ReaderProperties& props = default_reader_properties(), + const std::shared_ptr<FileMetaData>& metadata = nullptr); // Create a file reader instance from an Arrow file object. Thread-safety is // the responsibility of the file implementation static std::unique_ptr<ParquetFileReader> Open( const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, - const ReaderProperties& props = default_reader_properties()); + const ReaderProperties& props = default_reader_properties(), + const std::shared_ptr<FileMetaData>& metadata = nullptr); - // API Convenience to open a serialized Parquet file on disk, using built-in IO - // interface implementations that were created for testing, and may not be robust for - // all use cases. + // API Convenience to open a serialized Parquet file on disk, using Arrow IO + // interfaces. static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path, - bool memory_map = true, - const ReaderProperties& props = default_reader_properties()); + bool memory_map = true, const ReaderProperties& props = default_reader_properties(), + const std::shared_ptr<FileMetaData>& metadata = nullptr); void Open(std::unique_ptr<Contents> contents); void Close(); @@ -107,8 +108,8 @@ class PARQUET_EXPORT ParquetFileReader { // The RowGroupReader is owned by the FileReader std::shared_ptr<RowGroupReader> RowGroup(int i); - // Returns the file metadata - const FileMetaData* metadata() const; + // Returns the file metadata. Only one instance is ever created + std::shared_ptr<FileMetaData> metadata() const; void DebugPrint( std::ostream& stream, std::list<int> selected_columns, bool print_values = true); @@ -118,6 +119,10 @@ class PARQUET_EXPORT ParquetFileReader { std::unique_ptr<Contents> contents_; }; +// Read only Parquet file metadata +std::shared_ptr<FileMetaData> PARQUET_EXPORT ReadMetaData( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& source); + } // namespace parquet #endif // PARQUET_FILE_READER_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index e3be9b0..b0b8851 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -182,15 +182,48 @@ class TestLocalFile : public ::testing::Test { std::shared_ptr<::arrow::io::ReadableFile> handle; }; +class HelperFileClosed : public ArrowInputFile { + public: + explicit HelperFileClosed( + const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, bool* close_called) + : ArrowInputFile(file), close_called_(close_called) {} + + void Close() override { *close_called_ = true; } + + private: + bool* close_called_; +}; + TEST_F(TestLocalFile, FileClosedOnDestruction) { + bool close_called = false; { auto contents = SerializedFile::Open( - std::unique_ptr<RandomAccessSource>(new ArrowInputFile(handle))); + std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called))); std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); result->Open(std::move(contents)); } - ASSERT_EQ(-1, fcntl(fileno, F_GETFD)); - ASSERT_EQ(EBADF, errno); + ASSERT_TRUE(close_called); +} + +TEST_F(TestLocalFile, OpenWithMetadata) { + // PARQUET-808 + std::stringstream ss; + std::shared_ptr<FileMetaData> metadata = ReadMetaData(handle); + + auto reader = ParquetFileReader::Open(handle, default_reader_properties(), metadata); + + // Compare pointers + ASSERT_EQ(metadata.get(), reader->metadata().get()); + + std::list<int> columns; + reader->DebugPrint(ss, columns, true); + + // Make sure OpenFile passes on the external metadata, too + auto reader2 = ParquetFileReader::OpenFile( + alltypes_plain(), false, default_reader_properties(), metadata); + + // Compare pointers + ASSERT_EQ(metadata.get(), reader2->metadata().get()); } TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc index 9ad0336..b490c2e 100644 --- a/src/parquet/util/memory.cc +++ b/src/parquet/util/memory.cc @@ -347,9 +347,9 @@ bool ChunkedAllocator::CheckIntegrity(bool current_chunk_empty) { // ---------------------------------------------------------------------- // Arrow IO wrappers -// Close the output stream void ArrowFileMethods::Close() { - PARQUET_THROW_NOT_OK(file_interface()->Close()); + // Closing the file is the responsibility of the owner of the handle + return; } // Return the current position in the output stream relative to the start http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/378f335c/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index 1ffca35..b915450 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -286,7 +286,9 @@ class PARQUET_EXPORT OutputStream : virtual public FileInterface { class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface { public: + // No-op. Closing the file is the responsibility of the owner of the handle void Close() override; + int64_t Tell() override; protected:
