[ 
https://issues.apache.org/jira/browse/PARQUET-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288101#comment-16288101
 ] 

ASF GitHub Bot commented on PARQUET-859:
----------------------------------------

wesm closed pull request #424: PARQUET-859: Flatten parquet/file directory, 
consolidate file reader, file writer code
URL: https://github.com/apache/parquet-cpp/pull/424
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8a3558c3..47746312 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -375,6 +375,7 @@ enable_testing()
 # Dependencies
 ############################################################
 
+include_directories(${CMAKE_CURRENT_BINARY_DIR}/src)
 include_directories(
   ${CMAKE_CURRENT_SOURCE_DIR}/src
 )
@@ -661,7 +662,7 @@ if (NOT MSVC)
 endif()
 
 # List of thrift output targets
-set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/parquet)
+set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/src/parquet)
 set(THRIFT_OUTPUT_FILES "${THRIFT_OUTPUT_DIR}/parquet_types.cpp")
 set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} 
"${THRIFT_OUTPUT_DIR}/parquet_types.h")
 set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} 
"${THRIFT_OUTPUT_DIR}/parquet_constants.cpp")
@@ -683,30 +684,23 @@ add_custom_command(
 # Library config
 
 set(LIBPARQUET_SRCS
-  src/parquet/exception.cc
-  src/parquet/types.cc
-
   src/parquet/arrow/reader.cc
   src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
-
   src/parquet/column_reader.cc
   src/parquet/column_scanner.cc
   src/parquet/column_writer.cc
-
-  src/parquet/file/metadata.cc
-  src/parquet/file/printer.cc
-  src/parquet/file/reader.cc
-  src/parquet/file/reader-internal.cc
-  src/parquet/file/writer.cc
-  src/parquet/file/writer-internal.cc
-
-  src/parquet/schema.cc
-  src/parquet/statistics.cc
-
+  src/parquet/exception.cc
+  src/parquet/file_reader.cc
+  src/parquet/file_writer.cc
+  src/parquet/metadata.cc
   src/parquet/parquet_constants.cpp
   src/parquet/parquet_types.cpp
+  src/parquet/printer.cc
+  src/parquet/schema.cc
+  src/parquet/statistics.cc
+  src/parquet/types.cc
   src/parquet/util/comparison.cc
   src/parquet/util/memory.cc
 )
@@ -785,7 +779,6 @@ endif()
 add_subdirectory(src/parquet)
 add_subdirectory(src/parquet/api)
 add_subdirectory(src/parquet/arrow)
-add_subdirectory(src/parquet/file)
 add_subdirectory(src/parquet/util)
 
 if (NOT MSVC)
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index a2e283e2..bc16d8bd 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -23,6 +23,10 @@ install(FILES
   column_writer.h
   encoding.h
   exception.h
+  file_reader.h
+  file_writer.h
+  metadata.h
+  printer.h
   properties.h
   schema.h
   statistics.h
@@ -49,9 +53,12 @@ install(FILES
 ADD_PARQUET_TEST(column_reader-test)
 ADD_PARQUET_TEST(column_scanner-test)
 ADD_PARQUET_TEST(column_writer-test)
+ADD_PARQUET_TEST(file-deserialize-test)
+ADD_PARQUET_TEST(file-serialize-test)
 ADD_PARQUET_TEST(properties-test)
 ADD_PARQUET_TEST(statistics-test)
 ADD_PARQUET_TEST(encoding-test)
+ADD_PARQUET_TEST(metadata-test)
 ADD_PARQUET_TEST(public-api-test)
 ADD_PARQUET_TEST(types-test)
 ADD_PARQUET_TEST(reader-test)
diff --git a/src/parquet/api/reader.h b/src/parquet/api/reader.h
index ba9717ae..505654f8 100644
--- a/src/parquet/api/reader.h
+++ b/src/parquet/api/reader.h
@@ -22,11 +22,9 @@
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
 #include "parquet/exception.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader.h"
-
-// Metadata reader API
-#include "parquet/file/metadata.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/printer.h"
 
 // Schemas
 #include "parquet/api/schema.h"
diff --git a/src/parquet/api/writer.h b/src/parquet/api/writer.h
index cc3ae2a8..3b4e42f7 100644
--- a/src/parquet/api/writer.h
+++ b/src/parquet/api/writer.h
@@ -18,15 +18,10 @@
 #ifndef PARQUET_API_WRITER_H
 #define PARQUET_API_WRITER_H
 
-// Column reader API
+#include "parquet/api/io.h"
+#include "parquet/api/schema.h"
 #include "parquet/column_writer.h"
 #include "parquet/exception.h"
-#include "parquet/file/writer.h"
-
-// Schemas
-#include "parquet/api/schema.h"
-
-// IO
-#include "parquet/api/io.h"
+#include "parquet/file_writer.h"
 
 #endif  // PARQUET_API_WRITER_H
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc 
b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index edeef1ed..15d2cf72 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -23,8 +23,8 @@
 #include "parquet/arrow/writer.h"
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/util/memory.h"
 
 #include "arrow/api.h"
@@ -142,7 +142,6 @@ std::shared_ptr<::arrow::Table> 
TableFromVector<BooleanType>(const std::vector<b
 
 template <bool nullable, typename ParquetType>
 static void BM_WriteColumn(::benchmark::State& state) {
-  format::ColumnChunk thrift_metadata;
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, 
nullable);
 
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc 
b/src/parquet/arrow/arrow-reader-writer-test.cc
index a8d38241..c10b1644 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -34,7 +34,7 @@
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
 
-#include "parquet/file/writer.h"
+#include "parquet/file_writer.h"
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
diff --git a/src/parquet/arrow/record_reader.h 
b/src/parquet/arrow/record_reader.h
index 8d55f9d8..9ca8b68c 100644
--- a/src/parquet/arrow/record_reader.h
+++ b/src/parquet/arrow/record_reader.h
@@ -30,7 +30,7 @@
 #include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
 
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
 #include "parquet/schema.h"
 #include "parquet/util/visibility.h"
 
diff --git a/src/parquet/column-io-benchmark.cc 
b/src/parquet/column-io-benchmark.cc
index ec7b52ef..7c8d093e 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -19,8 +19,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/parquet_types.h"
 #include "parquet/util/memory.h"
 
 namespace parquet {
@@ -33,8 +33,8 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, 
OutputStream* dst,
                                          ColumnChunkMetaDataBuilder* metadata,
                                          ColumnDescriptor* schema,
                                          const WriterProperties* properties) {
-  std::unique_ptr<SerializedPageWriter> pager(
-      new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
+  std::unique_ptr<PageWriter> pager =
+      PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
   return std::unique_ptr<Int64Writer>(
       new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, 
properties));
 }
@@ -110,8 +110,8 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, 
Repetition::REPEATED, Compression::ZSTD)
 std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
                                          int64_t num_values, ColumnDescriptor* 
schema) {
   std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
-  std::unique_ptr<SerializedPageReader> page_reader(
-      new SerializedPageReader(std::move(source), num_values, 
Compression::UNCOMPRESSED));
+  std::unique_ptr<PageReader> page_reader =
+      PageReader::Open(std::move(source), num_values, 
Compression::UNCOMPRESSED);
   return std::unique_ptr<Int64Reader>(new Int64Reader(schema, 
std::move(page_reader)));
 }
 
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
index 85e3bb54..c34eee77 100644
--- a/src/parquet/column_page.h
+++ b/src/parquet/column_page.h
@@ -168,35 +168,6 @@ class DictionaryPage : public Page {
   bool is_sorted_;
 };
 
-// Abstract page iterator interface. This way, we can feed column pages to the
-// ColumnReader through whatever mechanism we choose
-class PageReader {
- public:
-  virtual ~PageReader() {}
-
-  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
-  // containing new Page otherwise
-  virtual std::shared_ptr<Page> NextPage() = 0;
-};
-
-class PageWriter {
- public:
-  virtual ~PageWriter() {}
-
-  // The Column Writer decides if dictionary encoding is used if set and
-  // if the dictionary encoding has fallen back to default encoding on 
reaching dictionary
-  // page limit
-  virtual void Close(bool has_dictionary, bool fallback) = 0;
-
-  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
-
-  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
-
-  virtual bool has_compressor() = 0;
-
-  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* 
dest_buffer) = 0;
-};
-
 }  // namespace parquet
 
 #endif  // PARQUET_COLUMN_PAGE_H
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index d23e7385..91557af7 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -24,11 +24,14 @@
 #include <arrow/buffer.h>
 #include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
+#include <arrow/util/compression.h>
 #include <arrow/util/rle-encoding.h>
 
 #include "parquet/column_page.h"
 #include "parquet/encoding-internal.h"
+#include "parquet/parquet_types.h"
 #include "parquet/properties.h"
+#include "parquet/thrift.h"
 
 using arrow::MemoryPool;
 
@@ -89,6 +92,177 @@ ReaderProperties default_reader_properties() {
   return default_reader_properties;
 }
 
+// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+  SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t 
total_num_rows,
+                       Compression::type codec, ::arrow::MemoryPool* pool)
+      : stream_(std::move(stream)),
+        decompression_buffer_(AllocateBuffer(pool, 0)),
+        seen_num_rows_(0),
+        total_num_rows_(total_num_rows) {
+    max_page_header_size_ = kDefaultMaxPageHeaderSize;
+    decompressor_ = GetCodecFromArrow(codec);
+  }
+
+  virtual ~SerializedPageReader() {}
+
+  // Implement the PageReader interface
+  std::shared_ptr<Page> NextPage() override;
+
+  void set_max_page_header_size(uint32_t size) override { 
max_page_header_size_ = size; }
+
+ private:
+  std::unique_ptr<InputStream> stream_;
+
+  format::PageHeader current_page_header_;
+  std::shared_ptr<Page> current_page_;
+
+  // Compression codec to use.
+  std::unique_ptr<::arrow::Codec> decompressor_;
+  std::shared_ptr<PoolBuffer> decompression_buffer_;
+
+  // Maximum allowed page size
+  uint32_t max_page_header_size_;
+
+  // Number of rows read in data pages so far
+  int64_t seen_num_rows_;
+
+  // Number of rows in all the data pages
+  int64_t total_num_rows_;
+};
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+  // Loop here because there may be unhandled page types that we skip until
+  // finding a page that we do know what to do with
+  while (seen_num_rows_ < total_num_rows_) {
+    int64_t bytes_read = 0;
+    int64_t bytes_available = 0;
+    uint32_t header_size = 0;
+    const uint8_t* buffer;
+    uint32_t allowed_page_size = kDefaultPageHeaderSize;
+
+    // Page headers can be very large because of page statistics
+    // We try to deserialize a larger buffer progressively
+    // until a maximum allowed header limit
+    while (true) {
+      buffer = stream_->Peek(allowed_page_size, &bytes_available);
+      if (bytes_available == 0) {
+        return std::shared_ptr<Page>(nullptr);
+      }
+
+      // This gets used, then set by DeserializeThriftMsg
+      header_size = static_cast<uint32_t>(bytes_available);
+      try {
+        DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+        break;
+      } catch (std::exception& e) {
+        // Failed to deserialize. Double the allowed page header size and try 
again
+        std::stringstream ss;
+        ss << e.what();
+        allowed_page_size *= 2;
+        if (allowed_page_size > max_page_header_size_) {
+          ss << "Deserializing page header failed.\n";
+          throw ParquetException(ss.str());
+        }
+      }
+    }
+    // Advance the stream offset
+    stream_->Advance(header_size);
+
+    int compressed_len = current_page_header_.compressed_page_size;
+    int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+    // Read the compressed data page.
+    buffer = stream_->Read(compressed_len, &bytes_read);
+    if (bytes_read != compressed_len) {
+      ParquetException::EofException();
+    }
+
+    // Uncompress it if we need to
+    if (decompressor_ != nullptr) {
+      // Grow the uncompressed buffer if we need to.
+      if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
+        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, 
false));
+      }
+      PARQUET_THROW_NOT_OK(
+          decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+                                    decompression_buffer_->mutable_data()));
+      buffer = decompression_buffer_->data();
+    }
+
+    auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
+
+    if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
+      const format::DictionaryPageHeader& dict_header =
+          current_page_header_.dictionary_page_header;
+
+      bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : 
false;
+
+      return std::make_shared<DictionaryPage>(page_buffer, 
dict_header.num_values,
+                                              FromThrift(dict_header.encoding),
+                                              is_sorted);
+    } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = 
current_page_header_.data_page_header;
+
+      EncodedStatistics page_statistics;
+      if (header.__isset.statistics) {
+        const format::Statistics& stats = header.statistics;
+        if (stats.__isset.max) {
+          page_statistics.set_max(stats.max);
+        }
+        if (stats.__isset.min) {
+          page_statistics.set_min(stats.min);
+        }
+        if (stats.__isset.null_count) {
+          page_statistics.set_null_count(stats.null_count);
+        }
+        if (stats.__isset.distinct_count) {
+          page_statistics.set_distinct_count(stats.distinct_count);
+        }
+      }
+
+      seen_num_rows_ += header.num_values;
+
+      return std::make_shared<DataPage>(
+          page_buffer, header.num_values, FromThrift(header.encoding),
+          FromThrift(header.definition_level_encoding),
+          FromThrift(header.repetition_level_encoding), page_statistics);
+    } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = 
current_page_header_.data_page_header_v2;
+      bool is_compressed = header.__isset.is_compressed ? header.is_compressed 
: false;
+
+      seen_num_rows_ += header.num_values;
+
+      return std::make_shared<DataPageV2>(
+          page_buffer, header.num_values, header.num_nulls, header.num_rows,
+          FromThrift(header.encoding), header.definition_levels_byte_length,
+          header.repetition_levels_byte_length, is_compressed);
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return std::shared_ptr<Page>(nullptr);
+}
+
+std::unique_ptr<PageReader> PageReader::Open(std::unique_ptr<InputStream> 
stream,
+                                             int64_t total_num_rows,
+                                             Compression::type codec,
+                                             ::arrow::MemoryPool* pool) {
+  return std::unique_ptr<PageReader>(
+      new SerializedPageReader(std::move(stream), total_num_rows, codec, 
pool));
+}
+
+// ----------------------------------------------------------------------
+
 ColumnReader::ColumnReader(const ColumnDescriptor* descr,
                            std::unique_ptr<PageReader> pager, MemoryPool* pool)
     : descr_(descr),
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index dcf41e83..6158cb3b 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -49,6 +49,12 @@ class RleDecoder;
 
 namespace parquet {
 
+// 16 MB is the default maximum page header size
+static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
+
+// 16 KB is the default expected page header size
+static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+
 namespace BitUtil = ::arrow::BitUtil;
 
 class PARQUET_EXPORT LevelDecoder {
@@ -72,6 +78,24 @@ class PARQUET_EXPORT LevelDecoder {
   std::unique_ptr<::arrow::BitReader> bit_packed_decoder_;
 };
 
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PARQUET_EXPORT PageReader {
+ public:
+  virtual ~PageReader() = default;
+
+  static std::unique_ptr<PageReader> Open(
+      std::unique_ptr<InputStream> stream, int64_t total_num_rows,
+      Compression::type codec,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+  // containing new Page otherwise
+  virtual std::shared_ptr<Page> NextPage() = 0;
+
+  virtual void set_max_page_header_size(uint32_t size) = 0;
+};
+
 class PARQUET_EXPORT ColumnReader {
  public:
   ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
diff --git a/src/parquet/column_writer-test.cc 
b/src/parquet/column_writer-test.cc
index 681f022b..b4e3232f 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -19,8 +19,7 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/parquet_types.h"
 #include "parquet/test-specialization.h"
 #include "parquet/test-util.h"
 #include "parquet/types.h"
@@ -63,8 +62,8 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
                    Compression::type compression = Compression::UNCOMPRESSED) {
     auto buffer = sink_->GetBuffer();
     std::unique_ptr<InMemoryInputStream> source(new 
InMemoryInputStream(buffer));
-    std::unique_ptr<SerializedPageReader> page_reader(
-        new SerializedPageReader(std::move(source), num_rows, compression));
+    std::unique_ptr<PageReader> page_reader =
+        PageReader::Open(std::move(source), num_rows, compression);
     reader_.reset(new TypedColumnReader<TestType>(this->descr_, 
std::move(page_reader)));
   }
 
@@ -74,8 +73,8 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
     sink_.reset(new InMemoryOutputStream());
     metadata_ = ColumnChunkMetaDataBuilder::Make(
         writer_properties_, this->descr_, 
reinterpret_cast<uint8_t*>(&thrift_metadata_));
-    std::unique_ptr<SerializedPageWriter> pager(
-        new SerializedPageWriter(sink_.get(), column_properties.codec, 
metadata_.get()));
+    std::unique_ptr<PageWriter> pager =
+        PageWriter::Open(sink_.get(), column_properties.codec, 
metadata_.get());
     WriterProperties::Builder wp_builder;
     if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
         column_properties.encoding == Encoding::RLE_DICTIONARY) {
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 4aadf2b4..bdaa9f69 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -17,12 +17,17 @@
 
 #include "parquet/column_writer.h"
 
+#include <cstdint>
+#include <memory>
+
 #include "arrow/util/bit-util.h"
+#include "arrow/util/compression.h"
 #include "arrow/util/rle-encoding.h"
 
 #include "parquet/encoding-internal.h"
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
+#include "parquet/thrift.h"
 #include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 
@@ -103,6 +108,169 @@ int LevelEncoder::Encode(int batch_size, const int16_t* 
levels) {
   return num_encoded;
 }
 
+// ----------------------------------------------------------------------
+// PageWriter implementation
+
+static format::Statistics ToThrift(const EncodedStatistics& 
row_group_statistics) {
+  format::Statistics statistics;
+  if (row_group_statistics.has_min) 
statistics.__set_min(row_group_statistics.min());
+  if (row_group_statistics.has_max) 
statistics.__set_max(row_group_statistics.max());
+  if (row_group_statistics.has_null_count)
+    statistics.__set_null_count(row_group_statistics.null_count);
+  if (row_group_statistics.has_distinct_count)
+    statistics.__set_distinct_count(row_group_statistics.distinct_count);
+  return statistics;
+}
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageWriter : public PageWriter {
+ public:
+  SerializedPageWriter(OutputStream* sink, Compression::type codec,
+                       ColumnChunkMetaDataBuilder* metadata,
+                       ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool())
+      : sink_(sink),
+        metadata_(metadata),
+        pool_(pool),
+        num_values_(0),
+        dictionary_page_offset_(0),
+        data_page_offset_(0),
+        total_uncompressed_size_(0),
+        total_compressed_size_(0) {
+    compressor_ = GetCodecFromArrow(codec);
+  }
+
+  virtual ~SerializedPageWriter() = default;
+
+  int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+    int64_t uncompressed_size = page.size();
+    std::shared_ptr<Buffer> compressed_data = nullptr;
+    if (has_compressor()) {
+      auto buffer = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(pool_, uncompressed_size));
+      Compress(*(page.buffer().get()), buffer.get());
+      compressed_data = std::static_pointer_cast<Buffer>(buffer);
+    } else {
+      compressed_data = page.buffer();
+    }
+
+    format::DictionaryPageHeader dict_page_header;
+    dict_page_header.__set_num_values(page.num_values());
+    dict_page_header.__set_encoding(ToThrift(page.encoding()));
+    dict_page_header.__set_is_sorted(page.is_sorted());
+
+    format::PageHeader page_header;
+    page_header.__set_type(format::PageType::DICTIONARY_PAGE);
+    
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+    
page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_dictionary_page_header(dict_page_header);
+    // TODO(PARQUET-594) crc checksum
+
+    int64_t start_pos = sink_->Tell();
+    if (dictionary_page_offset_ == 0) {
+      dictionary_page_offset_ = start_pos;
+    }
+    int64_t header_size =
+        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+    sink_->Write(compressed_data->data(), compressed_data->size());
+
+    total_uncompressed_size_ += uncompressed_size + header_size;
+    total_compressed_size_ += compressed_data->size() + header_size;
+
+    return sink_->Tell() - start_pos;
+  }
+
+  void Close(bool has_dictionary, bool fallback) override {
+    // index_page_offset = 0 since they are not supported
+    metadata_->Finish(num_values_, dictionary_page_offset_, 0, 
data_page_offset_,
+                      total_compressed_size_, total_uncompressed_size_, 
has_dictionary,
+                      fallback);
+
+    // Write metadata at end of column chunk
+    metadata_->WriteTo(sink_);
+  }
+
+  /**
+   * Compress a buffer.
+   */
+  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) 
override {
+    DCHECK(compressor_ != nullptr);
+
+    // Compress the data
+    int64_t max_compressed_size =
+        compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
+
+    // Use Arrow::Buffer::shrink_to_fit = false
+    // underlying buffer only keeps growing. Resize to a smaller size does not 
reallocate.
+    PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
+
+    int64_t compressed_size;
+    PARQUET_THROW_NOT_OK(
+        compressor_->Compress(src_buffer.size(), src_buffer.data(), 
max_compressed_size,
+                              dest_buffer->mutable_data(), &compressed_size));
+    PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
+  }
+
+  int64_t WriteDataPage(const CompressedDataPage& page) override {
+    int64_t uncompressed_size = page.uncompressed_size();
+    std::shared_ptr<Buffer> compressed_data = page.buffer();
+
+    format::DataPageHeader data_page_header;
+    data_page_header.__set_num_values(page.num_values());
+    data_page_header.__set_encoding(ToThrift(page.encoding()));
+    data_page_header.__set_definition_level_encoding(
+        ToThrift(page.definition_level_encoding()));
+    data_page_header.__set_repetition_level_encoding(
+        ToThrift(page.repetition_level_encoding()));
+    data_page_header.__set_statistics(ToThrift(page.statistics()));
+
+    format::PageHeader page_header;
+    page_header.__set_type(format::PageType::DATA_PAGE);
+    
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+    
page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_data_page_header(data_page_header);
+    // TODO(PARQUET-594) crc checksum
+
+    int64_t start_pos = sink_->Tell();
+    if (data_page_offset_ == 0) {
+      data_page_offset_ = start_pos;
+    }
+
+    int64_t header_size =
+        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+    sink_->Write(compressed_data->data(), compressed_data->size());
+
+    total_uncompressed_size_ += uncompressed_size + header_size;
+    total_compressed_size_ += compressed_data->size() + header_size;
+    num_values_ += page.num_values();
+
+    return sink_->Tell() - start_pos;
+  }
+
+  bool has_compressor() override { return (compressor_ != nullptr); }
+
+ private:
+  OutputStream* sink_;
+  ColumnChunkMetaDataBuilder* metadata_;
+  ::arrow::MemoryPool* pool_;
+  int64_t num_values_;
+  int64_t dictionary_page_offset_;
+  int64_t data_page_offset_;
+  int64_t total_uncompressed_size_;
+  int64_t total_compressed_size_;
+
+  // Compression codec to use.
+  std::unique_ptr<::arrow::Codec> compressor_;
+};
+
+std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, 
Compression::type codec,
+                                             ColumnChunkMetaDataBuilder* 
metadata,
+                                             ::arrow::MemoryPool* pool) {
+  return std::unique_ptr<PageWriter>(
+      new SerializedPageWriter(sink, codec, metadata, pool));
+}
+
 // ----------------------------------------------------------------------
 // ColumnWriter
 
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 04087472..f1c13a05 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -22,7 +22,7 @@
 
 #include "parquet/column_page.h"
 #include "parquet/encoding.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
@@ -69,6 +69,28 @@ class PARQUET_EXPORT LevelEncoder {
   std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_;
 };
 
+class PageWriter {
+ public:
+  virtual ~PageWriter() {}
+
+  static std::unique_ptr<PageWriter> Open(
+      OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* 
metadata,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  // The Column Writer decides if dictionary encoding is used if set and
+  // if the dictionary encoding has fallen back to default encoding on 
reaching dictionary
+  // page limit
+  virtual void Close(bool has_dictionary, bool fallback) = 0;
+
+  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
+
+  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+  virtual bool has_compressor() = 0;
+
+  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* 
dest_buffer) = 0;
+};
+
 static constexpr int WRITE_BATCH_SIZE = 1000;
 class PARQUET_EXPORT ColumnWriter {
  public:
diff --git a/src/parquet/encoding-benchmark.cc 
b/src/parquet/encoding-benchmark.cc
index 97eeefa5..72c41e58 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -18,7 +18,6 @@
 #include "benchmark/benchmark.h"
 
 #include "parquet/encoding-internal.h"
-#include "parquet/file/reader-internal.h"
 #include "parquet/util/memory.h"
 
 using arrow::default_memory_pool;
@@ -26,7 +25,6 @@ using arrow::MemoryPool;
 
 namespace parquet {
 
-using format::ColumnChunk;
 using schema::PrimitiveNode;
 
 namespace benchmark {
diff --git a/src/parquet/file/file-deserialize-test.cc 
b/src/parquet/file-deserialize-test.cc
similarity index 96%
rename from src/parquet/file/file-deserialize-test.cc
rename to src/parquet/file-deserialize-test.cc
index 0cab75f1..5e173755 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file-deserialize-test.cc
@@ -26,9 +26,9 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
 #include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
+#include "parquet/file_reader.h"
 #include "parquet/parquet_types.h"
 #include "parquet/thrift.h"
 #include "parquet/types.h"
@@ -73,7 +73,7 @@ class TestPageSerde : public ::testing::Test {
     EndStream();
     std::unique_ptr<InputStream> stream;
     stream.reset(new InMemoryInputStream(out_buffer_));
-    page_reader_.reset(new SerializedPageReader(std::move(stream), num_rows, 
codec));
+    page_reader_ = PageReader::Open(std::move(stream), num_rows, codec);
   }
 
   void WriteDataPageHeader(int max_serialized_len = 1024, int32_t 
uncompressed_size = 0,
@@ -99,7 +99,7 @@ class TestPageSerde : public ::testing::Test {
   std::unique_ptr<InMemoryOutputStream> out_stream_;
   std::shared_ptr<Buffer> out_buffer_;
 
-  std::unique_ptr<SerializedPageReader> page_reader_;
+  std::unique_ptr<PageReader> page_reader_;
   format::PageHeader page_header_;
   format::DataPageHeader data_page_header_;
 };
@@ -149,7 +149,7 @@ TEST_F(TestPageSerde, TestLargePageHeaders) {
 
   // check header size is between 256 KB to 16 MB
   ASSERT_LE(stats_size, out_stream_->Tell());
-  ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
+  ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell());
 
   InitSerializedPageReader(num_rows);
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
@@ -249,7 +249,7 @@ class TestParquetFileReader : public ::testing::Test {
     auto reader = std::make_shared<BufferReader>(buffer);
     auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
 
-    ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(wrapper))),
+    
ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))),
                  ParquetException);
   }
 
diff --git a/src/parquet/file/file-serialize-test.cc 
b/src/parquet/file-serialize-test.cc
similarity index 99%
rename from src/parquet/file/file-serialize-test.cc
rename to src/parquet/file-serialize-test.cc
index 4d94d2eb..b4df77ef 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -19,8 +19,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/test-specialization.h"
 #include "parquet/test-util.h"
 #include "parquet/types.h"
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
deleted file mode 100644
index 82e7c808..00000000
--- a/src/parquet/file/CMakeLists.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-install(FILES
-  metadata.h
-  printer.h
-  reader.h
-  writer.h
-  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/file")
-
-ADD_PARQUET_TEST(file-deserialize-test)
-ADD_PARQUET_TEST(file-metadata-test)
-ADD_PARQUET_TEST(file-serialize-test)
diff --git a/src/parquet/file/reader-internal.cc 
b/src/parquet/file/reader-internal.cc
deleted file mode 100644
index bc14ec91..00000000
--- a/src/parquet/file/reader-internal.cc
+++ /dev/null
@@ -1,309 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/reader-internal.h"
-
-#include <string.h>
-#include <algorithm>
-#include <exception>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_page.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// SerializedPageReader deserializes Thrift metadata and pages that have been
-// assembled in a serialized stream for storing in a Parquet files
-
-SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-                                           int64_t total_num_rows,
-                                           Compression::type codec, 
MemoryPool* pool)
-    : stream_(std::move(stream)),
-      decompression_buffer_(AllocateBuffer(pool, 0)),
-      seen_num_rows_(0),
-      total_num_rows_(total_num_rows) {
-  max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
-  decompressor_ = GetCodecFromArrow(codec);
-}
-
-std::shared_ptr<Page> SerializedPageReader::NextPage() {
-  // Loop here because there may be unhandled page types that we skip until
-  // finding a page that we do know what to do with
-  while (seen_num_rows_ < total_num_rows_) {
-    int64_t bytes_read = 0;
-    int64_t bytes_available = 0;
-    uint32_t header_size = 0;
-    const uint8_t* buffer;
-    uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
-
-    // Page headers can be very large because of page statistics
-    // We try to deserialize a larger buffer progressively
-    // until a maximum allowed header limit
-    while (true) {
-      buffer = stream_->Peek(allowed_page_size, &bytes_available);
-      if (bytes_available == 0) {
-        return std::shared_ptr<Page>(nullptr);
-      }
-
-      // This gets used, then set by DeserializeThriftMsg
-      header_size = static_cast<uint32_t>(bytes_available);
-      try {
-        DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
-        break;
-      } catch (std::exception& e) {
-        // Failed to deserialize. Double the allowed page header size and try 
again
-        std::stringstream ss;
-        ss << e.what();
-        allowed_page_size *= 2;
-        if (allowed_page_size > max_page_header_size_) {
-          ss << "Deserializing page header failed.\n";
-          throw ParquetException(ss.str());
-        }
-      }
-    }
-    // Advance the stream offset
-    stream_->Advance(header_size);
-
-    int compressed_len = current_page_header_.compressed_page_size;
-    int uncompressed_len = current_page_header_.uncompressed_page_size;
-
-    // Read the compressed data page.
-    buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) {
-      ParquetException::EofException();
-    }
-
-    // Uncompress it if we need to
-    if (decompressor_ != nullptr) {
-      // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
-        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, 
false));
-      }
-      PARQUET_THROW_NOT_OK(
-          decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
-                                    decompression_buffer_->mutable_data()));
-      buffer = decompression_buffer_->data();
-    }
-
-    auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
-
-    if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
-      const format::DictionaryPageHeader& dict_header =
-          current_page_header_.dictionary_page_header;
-
-      bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : 
false;
-
-      return std::make_shared<DictionaryPage>(page_buffer, 
dict_header.num_values,
-                                              FromThrift(dict_header.encoding),
-                                              is_sorted);
-    } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
-      const format::DataPageHeader& header = 
current_page_header_.data_page_header;
-
-      EncodedStatistics page_statistics;
-      if (header.__isset.statistics) {
-        const format::Statistics& stats = header.statistics;
-        if (stats.__isset.max) {
-          page_statistics.set_max(stats.max);
-        }
-        if (stats.__isset.min) {
-          page_statistics.set_min(stats.min);
-        }
-        if (stats.__isset.null_count) {
-          page_statistics.set_null_count(stats.null_count);
-        }
-        if (stats.__isset.distinct_count) {
-          page_statistics.set_distinct_count(stats.distinct_count);
-        }
-      }
-
-      seen_num_rows_ += header.num_values;
-
-      return std::make_shared<DataPage>(
-          page_buffer, header.num_values, FromThrift(header.encoding),
-          FromThrift(header.definition_level_encoding),
-          FromThrift(header.repetition_level_encoding), page_statistics);
-    } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
-      const format::DataPageHeaderV2& header = 
current_page_header_.data_page_header_v2;
-      bool is_compressed = header.__isset.is_compressed ? header.is_compressed 
: false;
-
-      seen_num_rows_ += header.num_values;
-
-      return std::make_shared<DataPageV2>(
-          page_buffer, header.num_values, header.num_nulls, header.num_rows,
-          FromThrift(header.encoding), header.definition_levels_byte_length,
-          header.repetition_levels_byte_length, is_compressed);
-    } else {
-      // We don't know what this page type is. We're allowed to skip non-data
-      // pages.
-      continue;
-    }
-  }
-  return std::shared_ptr<Page>(nullptr);
-}
-
-SerializedRowGroup::SerializedRowGroup(RandomAccessSource* source,
-                                       FileMetaData* file_metadata, int 
row_group_number,
-                                       const ReaderProperties& props)
-    : source_(source), file_metadata_(file_metadata), properties_(props) {
-  row_group_metadata_ = file_metadata->RowGroup(row_group_number);
-}
-const RowGroupMetaData* SerializedRowGroup::metadata() const {
-  return row_group_metadata_.get();
-}
-
-const ReaderProperties* SerializedRowGroup::properties() const { return 
&properties_; }
-
-// For PARQUET-816
-static constexpr int64_t kMaxDictHeaderSize = 100;
-
-std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
-  // Read column chunk from the file
-  auto col = row_group_metadata_->ColumnChunk(i);
-
-  int64_t col_start = col->data_page_offset();
-  if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) 
{
-    col_start = col->dictionary_page_offset();
-  }
-
-  int64_t col_length = col->total_compressed_size();
-  std::unique_ptr<InputStream> stream;
-
-  // PARQUET-816 workaround for old files created by older parquet-mr
-  const ApplicationVersion& version = file_metadata_->writer_version();
-  if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
-    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't 
include the
-    // dictionary page header size in total_compressed_size and 
total_uncompressed_size
-    // (see IMPALA-694). We add padding to compensate.
-    int64_t bytes_remaining = source_->Size() - (col_start + col_length);
-    int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
-    col_length += padding;
-  }
-
-  stream = properties_.GetStream(source_, col_start, col_length);
-
-  return std::unique_ptr<PageReader>(
-      new SerializedPageReader(std::move(stream), col->num_values(), 
col->compression(),
-                               properties_.memory_pool()));
-}
-
-// ----------------------------------------------------------------------
-// SerializedFile: Parquet on-disk layout
-
-// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
-static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
-static constexpr uint32_t FOOTER_SIZE = 8;
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
-    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
-    const std::shared_ptr<FileMetaData>& metadata) {
-  std::unique_ptr<ParquetFileReader::Contents> result(
-      new SerializedFile(std::move(source), props));
-
-  // Access private methods here, but otherwise unavailable
-  SerializedFile* file = static_cast<SerializedFile*>(result.get());
-
-  if (metadata == nullptr) {
-    // Validates magic bytes, parses metadata, and initializes the 
SchemaDescriptor
-    file->ParseMetaData();
-  } else {
-    file->file_metadata_ = metadata;
-  }
-
-  return result;
-}
-
-void SerializedFile::Close() { source_->Close(); }
-
-SerializedFile::~SerializedFile() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
-  std::unique_ptr<SerializedRowGroup> contents(
-      new SerializedRowGroup(source_.get(), file_metadata_.get(), i, 
properties_));
-  return std::make_shared<RowGroupReader>(std::move(contents));
-}
-
-std::shared_ptr<FileMetaData> SerializedFile::metadata() const { return 
file_metadata_; }
-
-SerializedFile::SerializedFile(
-    std::unique_ptr<RandomAccessSource> source,
-    const ReaderProperties& props = default_reader_properties())
-    : source_(std::move(source)), properties_(props) {}
-
-void SerializedFile::ParseMetaData() {
-  int64_t file_size = source_->Size();
-
-  if (file_size < FOOTER_SIZE) {
-    throw ParquetException("Corrupted file, smaller than file footer");
-  }
-
-  uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
-  int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
-  int64_t bytes_read =
-      source_->ReadAt(file_size - footer_read_size, footer_read_size, 
footer_buffer);
-
-  // Check if all bytes are read. Check if last 4 bytes read have the magic 
bits
-  if (bytes_read != footer_read_size ||
-      memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
-    throw ParquetException("Invalid parquet file. Corrupt footer.");
-  }
-
-  uint32_t metadata_len =
-      *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - 
FOOTER_SIZE);
-  int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
-  if (FOOTER_SIZE + metadata_len > file_size) {
-    throw ParquetException(
-        "Invalid parquet file. File is less than "
-        "file metadata size.");
-  }
-
-  std::shared_ptr<PoolBuffer> metadata_buffer =
-      AllocateBuffer(properties_.memory_pool(), metadata_len);
-
-  // Check if the footer_buffer contains the entire metadata
-  if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
-    memcpy(metadata_buffer->mutable_data(),
-           footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), 
metadata_len);
-  } else {
-    bytes_read =
-        source_->ReadAt(metadata_start, metadata_len, 
metadata_buffer->mutable_data());
-    if (bytes_read != metadata_len) {
-      throw ParquetException("Invalid parquet file. Could not read metadata 
bytes.");
-    }
-  }
-
-  file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/reader-internal.h 
b/src/parquet/file/reader-internal.h
deleted file mode 100644
index 282c5344..00000000
--- a/src/parquet/file/reader-internal.h
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_READER_INTERNAL_H
-#define PARQUET_FILE_READER_INTERNAL_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/reader.h"
-#include "parquet/parquet_types.h"
-#include "parquet/properties.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// 16 MB is the default maximum page header size
-static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
-
-// 16 KB is the default expected page header size
-static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class PARQUET_EXPORT SerializedPageReader : public PageReader {
- public:
-  SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows,
-                       Compression::type codec,
-                       ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool());
-
-  virtual ~SerializedPageReader() {}
-
-  // Implement the PageReader interface
-  virtual std::shared_ptr<Page> NextPage();
-
-  void set_max_page_header_size(uint32_t size) { max_page_header_size_ = size; 
}
-
- private:
-  std::unique_ptr<InputStream> stream_;
-
-  format::PageHeader current_page_header_;
-  std::shared_ptr<Page> current_page_;
-
-  // Compression codec to use.
-  std::unique_ptr<::arrow::Codec> decompressor_;
-  std::shared_ptr<PoolBuffer> decompression_buffer_;
-
-  // Maximum allowed page size
-  uint32_t max_page_header_size_;
-
-  // Number of rows read in data pages so far
-  int64_t seen_num_rows_;
-
-  // Number of rows in all the data pages
-  int64_t total_num_rows_;
-};
-
-// RowGroupReader::Contents implementation for the Parquet file specification
-class PARQUET_EXPORT SerializedRowGroup : public RowGroupReader::Contents {
- public:
-  SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
-                     int row_group_number, const ReaderProperties& props);
-
-  virtual const RowGroupMetaData* metadata() const;
-
-  virtual const ReaderProperties* properties() const;
-
-  virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
-
- private:
-  RandomAccessSource* source_;
-  FileMetaData* file_metadata_;
-  std::unique_ptr<RowGroupMetaData> row_group_metadata_;
-  ReaderProperties properties_;
-};
-
-// An implementation of ParquetFileReader::Contents that deals with the Parquet
-// file structure, Thrift deserialization, and other internal matters
-
-class PARQUET_EXPORT SerializedFile : public ParquetFileReader::Contents {
- public:
-  // Open the file. If no metadata is passed, it is parsed from the footer of
-  // the file
-  static std::unique_ptr<ParquetFileReader::Contents> Open(
-      std::unique_ptr<RandomAccessSource> source,
-      const ReaderProperties& props = default_reader_properties(),
-      const std::shared_ptr<FileMetaData>& metadata = nullptr);
-
-  void Close() override;
-  std::shared_ptr<RowGroupReader> GetRowGroup(int i) override;
-  std::shared_ptr<FileMetaData> metadata() const override;
-  virtual ~SerializedFile();
-
- private:
-  // This class takes ownership of the provided data source
-  explicit SerializedFile(std::unique_ptr<RandomAccessSource> source,
-                          const ReaderProperties& props);
-
-  std::unique_ptr<RandomAccessSource> source_;
-  std::shared_ptr<FileMetaData> file_metadata_;
-  ReaderProperties properties_;
-
-  void ParseMetaData();
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_FILE_READER_INTERNAL_H
diff --git a/src/parquet/file/writer-internal.cc 
b/src/parquet/file/writer-internal.cc
deleted file mode 100644
index 712289b4..00000000
--- a/src/parquet/file/writer-internal.cc
+++ /dev/null
@@ -1,327 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer-internal.h"
-
-#include <cstdint>
-#include <memory>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_writer.h"
-#include "parquet/schema-internal.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-using parquet::schema::GroupNode;
-using parquet::schema::SchemaFlattener;
-
-namespace parquet {
-
-// FIXME: copied from reader-internal.cc
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-// ----------------------------------------------------------------------
-// SerializedPageWriter
-
-SerializedPageWriter::SerializedPageWriter(OutputStream* sink, 
Compression::type codec,
-                                           ColumnChunkMetaDataBuilder* 
metadata,
-                                           MemoryPool* pool)
-    : sink_(sink),
-      metadata_(metadata),
-      pool_(pool),
-      num_values_(0),
-      dictionary_page_offset_(0),
-      data_page_offset_(0),
-      total_uncompressed_size_(0),
-      total_compressed_size_(0) {
-  compressor_ = GetCodecFromArrow(codec);
-}
-
-static format::Statistics ToThrift(const EncodedStatistics& 
row_group_statistics) {
-  format::Statistics statistics;
-  if (row_group_statistics.has_min) 
statistics.__set_min(row_group_statistics.min());
-  if (row_group_statistics.has_max) 
statistics.__set_max(row_group_statistics.max());
-  if (row_group_statistics.has_null_count)
-    statistics.__set_null_count(row_group_statistics.null_count);
-  if (row_group_statistics.has_distinct_count)
-    statistics.__set_distinct_count(row_group_statistics.distinct_count);
-  return statistics;
-}
-
-void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
-  // index_page_offset = 0 since they are not supported
-  metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
-                    total_compressed_size_, total_uncompressed_size_, 
has_dictionary,
-                    fallback);
-
-  // Write metadata at end of column chunk
-  metadata_->WriteTo(sink_);
-}
-
-void SerializedPageWriter::Compress(const Buffer& src_buffer,
-                                    ResizableBuffer* dest_buffer) {
-  DCHECK(compressor_ != nullptr);
-
-  // Compress the data
-  int64_t max_compressed_size =
-      compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
-
-  // Use Arrow::Buffer::shrink_to_fit = false
-  // underlying buffer only keeps growing. Resize to a smaller size does not 
reallocate.
-  PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
-
-  int64_t compressed_size;
-  PARQUET_THROW_NOT_OK(
-      compressor_->Compress(src_buffer.size(), src_buffer.data(), 
max_compressed_size,
-                            dest_buffer->mutable_data(), &compressed_size));
-  PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
-}
-
-int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
-  int64_t uncompressed_size = page.uncompressed_size();
-  std::shared_ptr<Buffer> compressed_data = page.buffer();
-
-  format::DataPageHeader data_page_header;
-  data_page_header.__set_num_values(page.num_values());
-  data_page_header.__set_encoding(ToThrift(page.encoding()));
-  data_page_header.__set_definition_level_encoding(
-      ToThrift(page.definition_level_encoding()));
-  data_page_header.__set_repetition_level_encoding(
-      ToThrift(page.repetition_level_encoding()));
-  data_page_header.__set_statistics(ToThrift(page.statistics()));
-
-  format::PageHeader page_header;
-  page_header.__set_type(format::PageType::DATA_PAGE);
-  
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-  
page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
-  page_header.__set_data_page_header(data_page_header);
-  // TODO(PARQUET-594) crc checksum
-
-  int64_t start_pos = sink_->Tell();
-  if (data_page_offset_ == 0) {
-    data_page_offset_ = start_pos;
-  }
-
-  int64_t header_size =
-      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  sink_->Write(compressed_data->data(), compressed_data->size());
-
-  total_uncompressed_size_ += uncompressed_size + header_size;
-  total_compressed_size_ += compressed_data->size() + header_size;
-  num_values_ += page.num_values();
-
-  return sink_->Tell() - start_pos;
-}
-
-int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
-  int64_t uncompressed_size = page.size();
-  std::shared_ptr<Buffer> compressed_data = nullptr;
-  if (has_compressor()) {
-    auto buffer = std::static_pointer_cast<ResizableBuffer>(
-        AllocateBuffer(pool_, uncompressed_size));
-    Compress(*(page.buffer().get()), buffer.get());
-    compressed_data = std::static_pointer_cast<Buffer>(buffer);
-  } else {
-    compressed_data = page.buffer();
-  }
-
-  format::DictionaryPageHeader dict_page_header;
-  dict_page_header.__set_num_values(page.num_values());
-  dict_page_header.__set_encoding(ToThrift(page.encoding()));
-  dict_page_header.__set_is_sorted(page.is_sorted());
-
-  format::PageHeader page_header;
-  page_header.__set_type(format::PageType::DICTIONARY_PAGE);
-  
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-  
page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
-  page_header.__set_dictionary_page_header(dict_page_header);
-  // TODO(PARQUET-594) crc checksum
-
-  int64_t start_pos = sink_->Tell();
-  if (dictionary_page_offset_ == 0) {
-    dictionary_page_offset_ = start_pos;
-  }
-  int64_t header_size =
-      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  sink_->Write(compressed_data->data(), compressed_data->size());
-
-  total_uncompressed_size_ += uncompressed_size + header_size;
-  total_compressed_size_ += compressed_data->size() + header_size;
-
-  return sink_->Tell() - start_pos;
-}
-
-// ----------------------------------------------------------------------
-// RowGroupSerializer
-
-int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); 
}
-
-int64_t RowGroupSerializer::num_rows() const {
-  if (current_column_writer_) {
-    CheckRowsWritten();
-  }
-  return num_rows_ < 0 ? 0 : num_rows_;
-}
-
-void RowGroupSerializer::CheckRowsWritten() const {
-  int64_t current_rows = current_column_writer_->rows_written();
-  if (num_rows_ < 0) {
-    num_rows_ = current_rows;
-    metadata_->set_num_rows(current_rows);
-  } else if (num_rows_ != current_rows) {
-    std::stringstream ss;
-    ss << "Column " << current_column_index_ << " had " << current_rows
-       << " while previous column had " << num_rows_;
-    throw ParquetException(ss.str());
-  }
-}
-
-ColumnWriter* RowGroupSerializer::NextColumn() {
-  if (current_column_writer_) {
-    CheckRowsWritten();
-  }
-
-  // Throws an error if more columns are being written
-  auto col_meta = metadata_->NextColumnChunk();
-
-  if (current_column_writer_) {
-    total_bytes_written_ += current_column_writer_->Close();
-  }
-
-  ++current_column_index_;
-
-  const ColumnDescriptor* column_descr = col_meta->descr();
-  std::unique_ptr<PageWriter> pager(
-      new SerializedPageWriter(sink_, 
properties_->compression(column_descr->path()),
-                               col_meta, properties_->memory_pool()));
-  current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), 
properties_);
-  return current_column_writer_.get();
-}
-
-int RowGroupSerializer::current_column() const { return 
metadata_->current_column(); }
-
-void RowGroupSerializer::Close() {
-  if (!closed_) {
-    closed_ = true;
-
-    if (current_column_writer_) {
-      CheckRowsWritten();
-      total_bytes_written_ += current_column_writer_->Close();
-      current_column_writer_.reset();
-    }
-
-    // Ensures all columns have been written
-    metadata_->Finish(total_bytes_written_);
-  }
-}
-
-// ----------------------------------------------------------------------
-// FileSerializer
-
-std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
-    const std::shared_ptr<OutputStream>& sink, const 
std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  std::unique_ptr<ParquetFileWriter::Contents> result(
-      new FileSerializer(sink, schema, properties, key_value_metadata));
-
-  return result;
-}
-
-void FileSerializer::Close() {
-  if (is_open_) {
-    if (row_group_writer_) {
-      num_rows_ += row_group_writer_->num_rows();
-      row_group_writer_->Close();
-    }
-    row_group_writer_.reset();
-
-    // Write magic bytes and metadata
-    WriteMetaData();
-
-    sink_->Close();
-    is_open_ = false;
-  }
-}
-
-int FileSerializer::num_columns() const { return schema_.num_columns(); }
-
-int FileSerializer::num_row_groups() const { return num_row_groups_; }
-
-int64_t FileSerializer::num_rows() const { return num_rows_; }
-
-const std::shared_ptr<WriterProperties>& FileSerializer::properties() const {
-  return properties_;
-}
-
-RowGroupWriter* FileSerializer::AppendRowGroup() {
-  if (row_group_writer_) {
-    row_group_writer_->Close();
-  }
-  num_row_groups_++;
-  auto rg_metadata = metadata_->AppendRowGroup();
-  std::unique_ptr<RowGroupWriter::Contents> contents(
-      new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
-  row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
-  return row_group_writer_.get();
-}
-
-FileSerializer::~FileSerializer() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-void FileSerializer::WriteMetaData() {
-  // Write MetaData
-  uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
-
-  // Get a FileMetaData
-  auto metadata = metadata_->Finish();
-  metadata->WriteTo(sink_.get());
-  metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
-
-  // Write Footer
-  sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
-  sink_->Write(PARQUET_MAGIC, 4);
-}
-
-FileSerializer::FileSerializer(
-    const std::shared_ptr<OutputStream>& sink, const 
std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
-    : ParquetFileWriter::Contents(schema, key_value_metadata),
-      sink_(sink),
-      is_open_(true),
-      properties_(properties),
-      num_row_groups_(0),
-      num_rows_(0),
-      metadata_(FileMetaDataBuilder::Make(&schema_, properties, 
key_value_metadata)) {
-  StartFile();
-}
-
-void FileSerializer::StartFile() {
-  // Parquet files always start with PAR1
-  sink_->Write(PARQUET_MAGIC, 4);
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/writer-internal.h 
b/src/parquet/file/writer-internal.h
deleted file mode 100644
index 3cd73fe3..00000000
--- a/src/parquet/file/writer-internal.h
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_WRITER_INTERNAL_H
-#define PARQUET_FILE_WRITER_INTERNAL_H
-
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/writer.h"
-#include "parquet/parquet_types.h"
-#include "parquet/util/memory.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class SerializedPageWriter : public PageWriter {
- public:
-  SerializedPageWriter(OutputStream* sink, Compression::type codec,
-                       ColumnChunkMetaDataBuilder* metadata,
-                       ::arrow::MemoryPool* pool = 
::arrow::default_memory_pool());
-
-  virtual ~SerializedPageWriter() {}
-
-  int64_t WriteDataPage(const CompressedDataPage& page) override;
-
-  int64_t WriteDictionaryPage(const DictionaryPage& page) override;
-
-  /**
-   * Compress a buffer.
-   */
-  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) 
override;
-
-  bool has_compressor() override { return (compressor_ != nullptr); }
-
-  void Close(bool has_dictionary, bool fallback) override;
-
- private:
-  OutputStream* sink_;
-  ColumnChunkMetaDataBuilder* metadata_;
-  ::arrow::MemoryPool* pool_;
-  int64_t num_values_;
-  int64_t dictionary_page_offset_;
-  int64_t data_page_offset_;
-  int64_t total_uncompressed_size_;
-  int64_t total_compressed_size_;
-
-  // Compression codec to use.
-  std::unique_ptr<::arrow::Codec> compressor_;
-};
-
-// RowGroupWriter::Contents implementation for the Parquet file specification
-class RowGroupSerializer : public RowGroupWriter::Contents {
- public:
-  RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
-                     const WriterProperties* properties)
-      : sink_(sink),
-        metadata_(metadata),
-        properties_(properties),
-        total_bytes_written_(0),
-        closed_(false),
-        current_column_index_(0),
-        num_rows_(-1) {}
-
-  int num_columns() const override;
-  int64_t num_rows() const override;
-
-  ColumnWriter* NextColumn() override;
-  int current_column() const override;
-  void Close() override;
-
- private:
-  OutputStream* sink_;
-  mutable RowGroupMetaDataBuilder* metadata_;
-  const WriterProperties* properties_;
-  int64_t total_bytes_written_;
-  bool closed_;
-  int current_column_index_;
-  mutable int64_t num_rows_;
-
-  void CheckRowsWritten() const;
-
-  std::shared_ptr<ColumnWriter> current_column_writer_;
-};
-
-// An implementation of ParquetFileWriter::Contents that deals with the Parquet
-// file structure, Thrift serialization, and other internal matters
-
-class FileSerializer : public ParquetFileWriter::Contents {
- public:
-  static std::unique_ptr<ParquetFileWriter::Contents> Open(
-      const std::shared_ptr<OutputStream>& sink,
-      const std::shared_ptr<schema::GroupNode>& schema,
-      const std::shared_ptr<WriterProperties>& properties = 
default_writer_properties(),
-      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = 
nullptr);
-
-  void Close() override;
-
-  RowGroupWriter* AppendRowGroup() override;
-
-  const std::shared_ptr<WriterProperties>& properties() const override;
-
-  int num_columns() const override;
-  int num_row_groups() const override;
-  int64_t num_rows() const override;
-
-  virtual ~FileSerializer();
-
- private:
-  explicit FileSerializer(
-      const std::shared_ptr<OutputStream>& sink,
-      const std::shared_ptr<schema::GroupNode>& schema,
-      const std::shared_ptr<WriterProperties>& properties,
-      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
-
-  std::shared_ptr<OutputStream> sink_;
-  bool is_open_;
-  const std::shared_ptr<WriterProperties> properties_;
-  int num_row_groups_;
-  int64_t num_rows_;
-  std::unique_ptr<FileMetaDataBuilder> metadata_;
-  std::unique_ptr<RowGroupWriter> row_group_writer_;
-
-  void StartFile();
-  void WriteMetaData();
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_FILE_WRITER_INTERNAL_H
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
deleted file mode 100644
index a91553ba..00000000
--- a/src/parquet/file/writer.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer.h"
-
-#include "parquet/file/writer-internal.h"
-#include "parquet/util/memory.h"
-
-using parquet::schema::GroupNode;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RowGroupWriter public API
-
-RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
-    : contents_(std::move(contents)) {}
-
-void RowGroupWriter::Close() {
-  if (contents_) {
-    contents_->Close();
-  }
-}
-
-ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
-
-int RowGroupWriter::current_column() { return contents_->current_column(); }
-
-int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
-
-// ----------------------------------------------------------------------
-// ParquetFileWriter public API
-
-ParquetFileWriter::ParquetFileWriter() {}
-
-ParquetFileWriter::~ParquetFileWriter() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
-    const std::shared_ptr<::arrow::io::OutputStream>& sink,
-    const std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
-              key_value_metadata);
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
-    const std::shared_ptr<OutputStream>& sink,
-    const std::shared_ptr<schema::GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  auto contents = FileSerializer::Open(sink, schema, properties, 
key_value_metadata);
-  std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
-  result->Open(std::move(contents));
-  return result;
-}
-
-const SchemaDescriptor* ParquetFileWriter::schema() const { return 
contents_->schema(); }
-
-const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
-  return contents_->schema()->Column(i);
-}
-
-int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
-
-int ParquetFileWriter::num_row_groups() const { return 
contents_->num_row_groups(); }
-
-const std::shared_ptr<const KeyValueMetadata>& 
ParquetFileWriter::key_value_metadata()
-    const {
-  return contents_->key_value_metadata();
-}
-
-void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> 
contents) {
-  contents_ = std::move(contents);
-}
-
-void ParquetFileWriter::Close() {
-  if (contents_) {
-    contents_->Close();
-    contents_.reset();
-  }
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
-  return contents_->AppendRowGroup();
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
-  return AppendRowGroup();
-}
-
-const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const 
{
-  return contents_->properties();
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/reader.cc b/src/parquet/file_reader.cc
similarity index 50%
rename from src/parquet/file/reader.cc
rename to src/parquet/file_reader.cc
index 9b9bde9f..72c71c66 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file_reader.cc
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
 
+#include <algorithm>
+#include <cstdint>
 #include <cstdio>
 #include <memory>
 #include <sstream>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "arrow/io/file.h"
 
@@ -29,15 +32,31 @@
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
 #include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
+#include "parquet/metadata.h"
+#include "parquet/parquet_types.h"
+#include "parquet/properties.h"
 #include "parquet/types.h"
 #include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 
 using std::string;
 
+namespace arrow {
+
+class Codec;
+
+}  // namespace arrow
+
 namespace parquet {
 
+// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
+static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
+static constexpr uint32_t FOOTER_SIZE = 8;
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// For PARQUET-816
+static constexpr int64_t kMaxDictHeaderSize = 100;
+
 // ----------------------------------------------------------------------
 // RowGroupReader public API
 
@@ -66,10 +85,145 @@ std::unique_ptr<PageReader> 
RowGroupReader::GetColumnPageReader(int i) {
 // Returns the rowgroup metadata
 const RowGroupMetaData* RowGroupReader::metadata() const { return 
contents_->metadata(); }
 
+// RowGroupReader::Contents implementation for the Parquet file specification
+class SerializedRowGroup : public RowGroupReader::Contents {
+ public:
+  SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
+                     int row_group_number, const ReaderProperties& props)
+      : source_(source), file_metadata_(file_metadata), properties_(props) {
+    row_group_metadata_ = file_metadata->RowGroup(row_group_number);
+  }
+
+  const RowGroupMetaData* metadata() const override { return 
row_group_metadata_.get(); }
+
+  const ReaderProperties* properties() const override { return &properties_; }
+
+  std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
+    // Read column chunk from the file
+    auto col = row_group_metadata_->ColumnChunk(i);
+
+    int64_t col_start = col->data_page_offset();
+    if (col->has_dictionary_page() && col_start > 
col->dictionary_page_offset()) {
+      col_start = col->dictionary_page_offset();
+    }
+
+    int64_t col_length = col->total_compressed_size();
+    std::unique_ptr<InputStream> stream;
+
+    // PARQUET-816 workaround for old files created by older parquet-mr
+    const ApplicationVersion& version = file_metadata_->writer_version();
+    if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
+      // The Parquet MR writer had a bug in 1.2.8 and below where it didn't 
include the
+      // dictionary page header size in total_compressed_size and 
total_uncompressed_size
+      // (see IMPALA-694). We add padding to compensate.
+      int64_t bytes_remaining = source_->Size() - (col_start + col_length);
+      int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
+      col_length += padding;
+    }
+
+    stream = properties_.GetStream(source_, col_start, col_length);
+
+    return PageReader::Open(std::move(stream), col->num_values(), 
col->compression(),
+                            properties_.memory_pool());
+  }
+
+ private:
+  RandomAccessSource* source_;
+  FileMetaData* file_metadata_;
+  std::unique_ptr<RowGroupMetaData> row_group_metadata_;
+  ReaderProperties properties_;
+};
+
+// ----------------------------------------------------------------------
+// SerializedFile: An implementation of ParquetFileReader::Contents that deals
+// with the Parquet file structure, Thrift deserialization, and other internal
+// matters
+
+// This class takes ownership of the provided data source
+class SerializedFile : public ParquetFileReader::Contents {
+ public:
+  SerializedFile(std::unique_ptr<RandomAccessSource> source,
+                 const ReaderProperties& props = default_reader_properties())
+      : source_(std::move(source)), properties_(props) {}
+
+  ~SerializedFile() {
+    try {
+      Close();
+    } catch (...) {
+    }
+  }
+
+  void Close() override { source_->Close(); }
+
+  std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
+    std::unique_ptr<SerializedRowGroup> contents(
+        new SerializedRowGroup(source_.get(), file_metadata_.get(), i, 
properties_));
+    return std::make_shared<RowGroupReader>(std::move(contents));
+  }
+
+  std::shared_ptr<FileMetaData> metadata() const override { return 
file_metadata_; }
+
+  void set_metadata(const std::shared_ptr<FileMetaData>& metadata) {
+    file_metadata_ = metadata;
+  }
+
+  void ParseMetaData() {
+    int64_t file_size = source_->Size();
+
+    if (file_size < FOOTER_SIZE) {
+      throw ParquetException("Corrupted file, smaller than file footer");
+    }
+
+    uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
+    int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
+    int64_t bytes_read =
+        source_->ReadAt(file_size - footer_read_size, footer_read_size, 
footer_buffer);
+
+    // Check if all bytes are read. Check if last 4 bytes read have the magic 
bits
+    if (bytes_read != footer_read_size ||
+        memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
+      throw ParquetException("Invalid parquet file. Corrupt footer.");
+    }
+
+    uint32_t metadata_len =
+        *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - 
FOOTER_SIZE);
+    int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+    if (FOOTER_SIZE + metadata_len > file_size) {
+      throw ParquetException(
+          "Invalid parquet file. File is less than "
+          "file metadata size.");
+    }
+
+    std::shared_ptr<PoolBuffer> metadata_buffer =
+        AllocateBuffer(properties_.memory_pool(), metadata_len);
+
+    // Check if the footer_buffer contains the entire metadata
+    if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
+      memcpy(metadata_buffer->mutable_data(),
+             footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
+             metadata_len);
+    } else {
+      bytes_read =
+          source_->ReadAt(metadata_start, metadata_len, 
metadata_buffer->mutable_data());
+      if (bytes_read != metadata_len) {
+        throw ParquetException("Invalid parquet file. Could not read metadata 
bytes.");
+      }
+    }
+
+    file_metadata_ = FileMetaData::Make(metadata_buffer->data(), 
&metadata_len);
+  }
+
+ private:
+  std::unique_ptr<RandomAccessSource> source_;
+  std::shared_ptr<FileMetaData> file_metadata_;
+  ReaderProperties properties_;
+};
+
 // ----------------------------------------------------------------------
 // ParquetFileReader public API
 
 ParquetFileReader::ParquetFileReader() {}
+
 ParquetFileReader::~ParquetFileReader() {
   try {
     Close();
@@ -77,6 +231,27 @@ ParquetFileReader::~ParquetFileReader() {
   }
 }
 
+// Open the file. If no metadata is passed, it is parsed from the footer of
+// the file
+std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
+    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
+    const std::shared_ptr<FileMetaData>& metadata) {
+  std::unique_ptr<ParquetFileReader::Contents> result(
+      new SerializedFile(std::move(source), props));
+
+  // Access private methods here, but otherwise unavailable
+  SerializedFile* file = static_cast<SerializedFile*>(result.get());
+
+  if (metadata == nullptr) {
+    // Validates magic bytes, parses metadata, and initializes the 
SchemaDescriptor
+    file->ParseMetaData();
+  } else {
+    file->set_metadata(metadata);
+  }
+
+  return result;
+}
+
 std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
     const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
     const ReaderProperties& props, const std::shared_ptr<FileMetaData>& 
metadata) {
diff --git a/src/parquet/file/reader.h b/src/parquet/file_reader.h
similarity index 94%
rename from src/parquet/file/reader.h
rename to src/parquet/file_reader.h
index 7558394a..f751e9b3 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file_reader.h
@@ -25,8 +25,8 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
+#include "parquet/column_reader.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
@@ -71,6 +71,11 @@ class PARQUET_EXPORT ParquetFileReader {
   // easily create test fixtures
   // An implementation of the Contents class is defined in the .cc file
   struct Contents {
+    static std::unique_ptr<Contents> Open(
+        std::unique_ptr<RandomAccessSource> source,
+        const ReaderProperties& props = default_reader_properties(),
+        const std::shared_ptr<FileMetaData>& metadata = nullptr);
+
     virtual ~Contents() {}
     // Perform any cleanup associated with the file contents
     virtual void Close() = 0;
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
new file mode 100644
index 00000000..87ee4f60
--- /dev/null
+++ b/src/parquet/file_writer.cc
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_writer.h"
+
+#include "parquet/column_writer.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
+#include "parquet/thrift.h"
+#include "parquet/util/memory.h"
+
+using arrow::MemoryPool;
+
+using parquet::schema::GroupNode;
+using parquet::schema::SchemaFlattener;
+
+namespace parquet {
+
+// FIXME: copied from reader-internal.cc
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// ----------------------------------------------------------------------
+// RowGroupWriter public API
+
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
+    : contents_(std::move(contents)) {}
+
+void RowGroupWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+  }
+}
+
+ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
+
+int RowGroupWriter::current_column() { return contents_->current_column(); }
+
+int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
+
+// ----------------------------------------------------------------------
+// RowGroupSerializer
+
+// RowGroupWriter::Contents implementation for the Parquet file specification
+class RowGroupSerializer : public RowGroupWriter::Contents {
+ public:
+  RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
+                     const WriterProperties* properties)
+      : sink_(sink),
+        metadata_(metadata),
+        properties_(properties),
+        total_bytes_written_(0),
+        closed_(false),
+        current_column_index_(0),
+        num_rows_(-1) {}
+
+  int num_columns() const override { return metadata_->num_columns(); }
+
+  int64_t num_rows() const override {
+    if (current_column_writer_) {
+      CheckRowsWritten();
+    }
+    return num_rows_ < 0 ? 0 : num_rows_;
+  }
+
+  ColumnWriter* NextColumn() override {
+    if (current_column_writer_) {
+      CheckRowsWritten();
+    }
+
+    // Throws an error if more columns are being written
+    auto col_meta = metadata_->NextColumnChunk();
+
+    if (current_column_writer_) {
+      total_bytes_written_ += current_column_writer_->Close();
+    }
+
+    ++current_column_index_;
+
+    const ColumnDescriptor* column_descr = col_meta->descr();
+    std::unique_ptr<PageWriter> pager =
+        PageWriter::Open(sink_, 
properties_->compression(column_descr->path()), col_meta,
+                         properties_->memory_pool());
+    current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), 
properties_);
+    return current_column_writer_.get();
+  }
+
+  int current_column() const override { return metadata_->current_column(); }
+
+  void Close() override {
+    if (!closed_) {
+      closed_ = true;
+
+      if (current_column_writer_) {
+        CheckRowsWritten();
+        total_bytes_written_ += current_column_writer_->Close();
+        current_column_writer_.reset();
+      }
+
+      // Ensures all columns have been written
+      metadata_->Finish(total_bytes_written_);
+    }
+  }
+
+ private:
+  OutputStream* sink_;
+  mutable RowGroupMetaDataBuilder* metadata_;
+  const WriterProperties* properties_;
+  int64_t total_bytes_written_;
+  bool closed_;
+  int current_column_index_;
+  mutable int64_t num_rows_;
+
+  void CheckRowsWritten() const {
+    int64_t current_rows = current_column_writer_->rows_written();
+    if (num_rows_ < 0) {
+      num_rows_ = current_rows;
+      metadata_->set_num_rows(current_rows);
+    } else if (num_rows_ != current_rows) {
+      std::stringstream ss;
+      ss << "Column " << current_column_index_ << " had " << current_rows
+         << " while previous column had " << num_rows_;
+      throw ParquetException(ss.str());
+    }
+  }
+
+  std::shared_ptr<ColumnWriter> current_column_writer_;
+};
+
+// ----------------------------------------------------------------------
+// FileSerializer
+
+// An implementation of ParquetFileWriter::Contents that deals with the Parquet
+// file structure, Thrift serialization, and other internal matters
+
+class FileSerializer : public ParquetFileWriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileWriter::Contents> Open(
+      const std::shared_ptr<OutputStream>& sink, const 
std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<WriterProperties>& properties,
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+    std::unique_ptr<ParquetFileWriter::Contents> result(
+        new FileSerializer(sink, schema, properties, key_value_metadata));
+
+    return result;
+  }
+
+  void Close() override {
+    if (is_open_) {
+      if (row_group_writer_) {
+        num_rows_ += row_group_writer_->num_rows();
+        row_group_writer_->Close();
+      }
+      row_group_writer_.reset();
+
+      // Write magic bytes and metadata
+      WriteMetaData();
+
+      sink_->Close();
+      is_open_ = false;
+    }
+  }
+
+  int num_columns() const override { return schema_.num_columns(); }
+
+  int num_row_groups() const override { return num_row_groups_; }
+
+  int64_t num_rows() const override { return num_rows_; }
+
+  const std::shared_ptr<WriterProperties>& properties() const override {
+    return properties_;
+  }
+
+  RowGroupWriter* AppendRowGroup() override {
+    if (row_group_writer_) {
+      row_group_writer_->Close();
+    }
+    num_row_groups_++;
+    auto rg_metadata = metadata_->AppendRowGroup();
+    std::unique_ptr<RowGroupWriter::Contents> contents(
+        new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+    row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
+    return row_group_writer_.get();
+  }
+
+  ~FileSerializer() {
+    try {
+      Close();
+    } catch (...) {
+    }
+  }
+
+ private:
+  FileSerializer(const std::shared_ptr<OutputStream>& sink,
+                 const std::shared_ptr<GroupNode>& schema,
+                 const std::shared_ptr<WriterProperties>& properties,
+                 const std::shared_ptr<const KeyValueMetadata>& 
key_value_metadata)
+      : ParquetFileWriter::Contents(schema, key_value_metadata),
+        sink_(sink),
+        is_open_(true),
+        properties_(properties),
+        num_row_groups_(0),
+        num_rows_(0),
+        metadata_(FileMetaDataBuilder::Make(&schema_, properties, 
key_value_metadata)) {
+    StartFile();
+  }
+
+  std::shared_ptr<OutputStream> sink_;
+  bool is_open_;
+  const std::shared_ptr<WriterProperties> properties_;
+  int num_row_groups_;
+  int64_t num_rows_;
+  std::unique_ptr<FileMetaDataBuilder> metadata_;
+  std::unique_ptr<RowGroupWriter> row_group_writer_;
+
+  void StartFile() {
+    // Parquet files always start with PAR1
+    sink_->Write(PARQUET_MAGIC, 4);
+  }
+
+  void WriteMetaData() {
+    // Write MetaData
+    uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
+
+    // Get a FileMetaData
+    auto metadata = metadata_->Finish();
+    metadata->WriteTo(sink_.get());
+    metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
+
+    // Write Footer
+    sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
+    sink_->Write(PARQUET_MAGIC, 4);
+  }
+};
+
+// ----------------------------------------------------------------------
+// ParquetFileWriter public API
+
+ParquetFileWriter::ParquetFileWriter() {}
+
+ParquetFileWriter::~ParquetFileWriter() {
+  try {
+    Close();
+  } catch (...) {
+  }
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    const std::shared_ptr<::arrow::io::OutputStream>& sink,
+    const std::shared_ptr<GroupNode>& schema,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
+              key_value_metadata);
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    const std::shared_ptr<OutputStream>& sink,
+    const std::shared_ptr<schema::GroupNode>& schema,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  auto contents = FileSerializer::Open(sink, schema, properties, 
key_value_metadata);
+  std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
+  result->Open(std::move(contents));
+  return result;
+}
+
+const SchemaDescriptor* ParquetFileWriter::schema() const { return 
contents_->schema(); }
+
+const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
+  return contents_->schema()->Column(i);
+}
+
+int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
+
+int ParquetFileWriter::num_row_groups() const { return 
contents_->num_row_groups(); }
+
+const std::shared_ptr<const KeyValueMetadata>& 
ParquetFileWriter::key_value_metadata()
+    const {
+  return contents_->key_value_metadata();
+}
+
+void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> 
contents) {
+  contents_ = std::move(contents);
+}
+
+void ParquetFileWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+    contents_.reset();
+  }
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
+  return contents_->AppendRowGroup();
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
+  return AppendRowGroup();
+}
+
+const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const 
{
+  return contents_->properties();
+}
+
+}  // namespace parquet
diff --git a/src/parquet/file/writer.h b/src/parquet/file_writer.h
similarity index 99%
rename from src/parquet/file/writer.h
rename to src/parquet/file_writer.h
index 844aa161..f1652611 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file_writer.h
@@ -21,7 +21,7 @@
 #include <cstdint>
 #include <memory>
 
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/util/memory.h"
diff --git a/src/parquet/file/file-metadata-test.cc 
b/src/parquet/metadata-test.cc
similarity index 99%
rename from src/parquet/file/file-metadata-test.cc
rename to src/parquet/metadata-test.cc
index 49cba3a4..b20293bb 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/metadata-test.cc
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "parquet/metadata.h"
 #include <gtest/gtest.h>
-#include "parquet/file/metadata.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
diff --git a/src/parquet/file/metadata.cc b/src/parquet/metadata.cc
similarity index 99%
rename from src/parquet/file/metadata.cc
rename to src/parquet/metadata.cc
index 9f1cdd7c..1c7db865 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -20,7 +20,7 @@
 #include <vector>
 
 #include "parquet/exception.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
 #include "parquet/thrift.h"
diff --git a/src/parquet/file/metadata.h b/src/parquet/metadata.h
similarity index 100%
rename from src/parquet/file/metadata.h
rename to src/parquet/metadata.h
diff --git a/src/parquet/file/printer.cc b/src/parquet/printer.cc
similarity index 99%
rename from src/parquet/file/printer.cc
rename to src/parquet/printer.cc
index 727552d4..88b55289 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/printer.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/file/printer.h"
+#include "parquet/printer.h"
 
 #include <string>
 #include <vector>
diff --git a/src/parquet/file/printer.h b/src/parquet/printer.h
similarity index 97%
rename from src/parquet/file/printer.h
rename to src/parquet/printer.h
index a18af4a7..3b828829 100644
--- a/src/parquet/file/printer.h
+++ b/src/parquet/printer.h
@@ -25,7 +25,7 @@
 #include <string>
 #include <vector>
 
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
 
 namespace parquet {
 
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
index 4a063c1e..c740b595 100644
--- a/src/parquet/properties-test.cc
+++ b/src/parquet/properties-test.cc
@@ -19,7 +19,7 @@
 
 #include <string>
 
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
 #include "parquet/properties.h"
 
 namespace parquet {
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index cefa452d..c536fdcb 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -27,9 +27,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/printer.h"
 #include "parquet/util/memory.h"
 
 using std::string;
@@ -204,7 +203,7 @@ class HelperFileClosed : public ArrowInputFile {
 TEST_F(TestLocalFile, FileClosedOnDestruction) {
   bool close_called = false;
   {
-    auto contents = SerializedFile::Open(
+    auto contents = ParquetFileReader::Contents::Open(
         std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, 
&close_called)));
     std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
     result->Open(std::move(contents));
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index fc905b51..e5992c6e 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -26,8 +26,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/test-specialization.h"
@@ -194,8 +194,9 @@ bool* 
TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& v
 }
 
 template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
-    TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& 
values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+    const std::vector<typename TestType::c_type>& values) {
   return values;
 }
 
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 0698652f..ac6d0a12 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -79,8 +79,7 @@ class MockPageReader : public PageReader {
   explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
       : pages_(pages), page_index_(0) {}
 
-  // Implement the PageReader interface
-  virtual shared_ptr<Page> NextPage() {
+  shared_ptr<Page> NextPage() override {
     if (page_index_ == static_cast<int>(pages_.size())) {
       // EOS to consumer
       return shared_ptr<Page>(nullptr);
@@ -88,6 +87,9 @@ class MockPageReader : public PageReader {
     return pages_[page_index_++];
   }
 
+  // No-op
+  void set_max_page_header_size(uint32_t size) override {}
+
  private:
   vector<shared_ptr<Page>> pages_;
   int page_index_;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [C++] Flatten parquet/file directory
> ------------------------------------
>
>                 Key: PARQUET-859
>                 URL: https://issues.apache.org/jira/browse/PARQUET-859
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-cpp
>            Reporter: Wes McKinney
>            Assignee: Wes McKinney
>             Fix For: cpp-1.4.0
>
>
> With appropriate file renaming. Related to PARQUET-858



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to