This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f6623929a8 [GLUTEN-9468][VL] Remove parquet-arrow reader dependency in 
benchmarks (#9469)
f6623929a8 is described below

commit f6623929a815dbea5bcdef5b945f1c4217e71f0a
Author: Rong Ma <[email protected]>
AuthorDate: Wed Apr 30 17:03:59 2025 +0100

    [GLUTEN-9468][VL] Remove parquet-arrow reader dependency in benchmarks 
(#9469)
---
 cpp/velox/benchmarks/CMakeLists.txt                |   4 -
 cpp/velox/benchmarks/ColumnarToRowBenchmark.cc     | 282 ------------
 cpp/velox/benchmarks/CompressionBenchmark.cc       | 472 ---------------------
 cpp/velox/benchmarks/ParquetWriteBenchmark.cc      | 244 ++---------
 cpp/velox/operators/reader/FileReaderIterator.cc   |   6 -
 cpp/velox/operators/reader/FileReaderIterator.h    |   8 +-
 .../operators/reader/ParquetReaderIterator.cc      | 116 +++--
 cpp/velox/operators/reader/ParquetReaderIterator.h |  28 +-
 8 files changed, 126 insertions(+), 1034 deletions(-)

diff --git a/cpp/velox/benchmarks/CMakeLists.txt 
b/cpp/velox/benchmarks/CMakeLists.txt
index aeb43518e8..20c8bf27d1 100644
--- a/cpp/velox/benchmarks/CMakeLists.txt
+++ b/cpp/velox/benchmarks/CMakeLists.txt
@@ -32,10 +32,6 @@ endfunction()
 # Generic benchmark
 add_velox_benchmark(generic_benchmark GenericBenchmark.cc)
 
-add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc)
-
 add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)
 
 add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)
-
-add_velox_benchmark(compression_benchmark CompressionBenchmark.cc)
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc 
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
deleted file mode 100644
index e32e6d7513..0000000000
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/c/abi.h>
-#include <arrow/c/bridge.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/type.h>
-#include <arrow/util/io_util.h>
-#include <benchmark/benchmark.h>
-#include <gtest/gtest.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-
-#include <chrono>
-
-#include "benchmarks/common/BenchmarkUtils.h"
-#include "memory/ArrowMemoryPool.h"
-#include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
-#include "operators/serializer/VeloxColumnarToRowConverter.h"
-#include "utils/Macros.h"
-#include "utils/TestUtils.h"
-#include "utils/VeloxArrowUtils.h"
-#include "velox/vector/arrow/Bridge.h"
-
-using namespace facebook;
-using namespace arrow;
-namespace gluten {
-
-const int kBatchBufferSize = 4096;
-
-class GoogleBenchmarkColumnarToRow {
- public:
-  GoogleBenchmarkColumnarToRow(std::string fileName) {
-    getRecordBatchReader(fileName);
-  }
-
-  void getRecordBatchReader(const std::string& inputFile) {
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-
-    std::shared_ptr<arrow::fs::FileSystem> fs;
-    std::string fileName;
-    ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile, 
&fileName))
-
-    ARROW_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
-    properties_.set_batch_size(kBatchBufferSize);
-    properties_.set_pre_buffer(false);
-    properties_.set_use_threads(false);
-
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    ASSERT_NOT_OK(parquetReader->GetSchema(&schema_));
-
-    auto numRowgroups = parquetReader->num_row_groups();
-
-    for (int i = 0; i < numRowgroups; ++i) {
-      rowGroupIndices_.push_back(i);
-    }
-
-    auto numColumns = schema_->num_fields();
-    for (int i = 0; i < numColumns; ++i) {
-      columnIndices_.push_back(i);
-    }
-  }
-
-  virtual void operator()(benchmark::State& state) {}
-
- protected:
-  velox::VectorPtr recordBatch2RowVector(const arrow::RecordBatch& rb) {
-    ArrowArray arrowArray;
-    ArrowSchema arrowSchema;
-    ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
-    return velox::importFromArrowAsOwner(arrowSchema, arrowArray, 
gluten::defaultLeafVeloxMemoryPool().get());
-  }
-
- protected:
-  std::string fileName_;
-  std::shared_ptr<arrow::io::RandomAccessFile> file_;
-  std::vector<int> rowGroupIndices_;
-  std::vector<int> columnIndices_;
-  std::shared_ptr<arrow::Schema> schema_;
-  parquet::ArrowReaderProperties properties_;
-};
-class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public 
GoogleBenchmarkColumnarToRow {
- public:
-  GoogleBenchmarkColumnarToRowCacheScanBenchmark(std::string filename) : 
GoogleBenchmarkColumnarToRow(filename) {}
-  void operator()(benchmark::State& state) {
-    if (state.range(0) == 0xffffffff) {
-      setCpu(state.thread_index());
-    } else {
-      setCpu(state.range(0));
-    }
-
-    std::shared_ptr<arrow::RecordBatch> recordBatch;
-    int64_t elapseRead = 0;
-    int64_t numBatches = 0;
-    int64_t numRows = 0;
-    int64_t initTime = 0;
-    int64_t writeTime = 0;
-
-    std::vector<int> localColumnIndices = columnIndices_;
-
-    std::shared_ptr<arrow::Schema> localSchema;
-    localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
-    if (state.thread_index() == 0)
-      LOG(INFO) << localSchema->ToString();
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        ::arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    std::vector<velox::VectorPtr> vectors;
-    ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
localColumnIndices, &recordBatchReader));
-    do {
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-
-      if (recordBatch) {
-        vectors.push_back(recordBatch2RowVector(*recordBatch));
-        numBatches += 1;
-        numRows += recordBatch->num_rows();
-      }
-    } while (recordBatch);
-
-    LOG(WARNING) << " parquet parse done elapsed time = " << elapseRead / 
1000000 << " rows = " << numRows;
-
-    // reuse the columnarToRowConverter for batches caused system % increase a 
lot
-    auto ctxPool = defaultLeafVeloxMemoryPool();
-    for (auto _ : state) {
-      for (const auto& vector : vectors) {
-        auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
-        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
-        auto cb = std::make_shared<VeloxColumnarBatch>(row);
-        TIME_NANO_START(writeTime);
-        columnarToRowConverter->convert(cb);
-        TIME_NANO_END(writeTime);
-      }
-    }
-
-    state.counters["rowgroups"] =
-        benchmark::Counter(rowGroupIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["columns"] =
-        benchmark::Counter(columnIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["batches"] =
-        benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["num_rows"] =
-        benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["batch_buffer_size"] =
-        benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-
-    state.counters["parquet_parse"] =
-        benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["init_time"] =
-        benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["write_time"] =
-        benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-  }
-};
-
-class GoogleBenchmarkColumnarToRowIterateScanBenchmark : public 
GoogleBenchmarkColumnarToRow {
- public:
-  GoogleBenchmarkColumnarToRowIterateScanBenchmark(std::string filename) : 
GoogleBenchmarkColumnarToRow(filename) {}
-  void operator()(benchmark::State& state) {
-    setCpu(state.thread_index());
-
-    int64_t elapseRead = 0;
-    int64_t numBatches = 0;
-    int64_t numRows = 0;
-    int64_t initTime = 0;
-    int64_t writeTime = 0;
-
-    std::shared_ptr<arrow::RecordBatch> recordBatch;
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    auto ctxPool = defaultLeafVeloxMemoryPool();
-    for (auto _ : state) {
-      ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
columnIndices_, &recordBatchReader));
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-      while (recordBatch) {
-        numBatches += 1;
-        numRows += recordBatch->num_rows();
-        auto vector = recordBatch2RowVector(*recordBatch);
-        auto columnarToRowConverter = 
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
-        auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
-        auto cb = std::make_shared<VeloxColumnarBatch>(row);
-        TIME_NANO_START(writeTime);
-        columnarToRowConverter->convert(cb);
-        TIME_NANO_END(writeTime);
-        TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-      }
-    }
-
-    state.counters["rowgroups"] =
-        benchmark::Counter(rowGroupIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["columns"] =
-        benchmark::Counter(columnIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["batches"] =
-        benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["num_rows"] =
-        benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["batch_buffer_size"] =
-        benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-
-    state.counters["parquet_parse"] =
-        benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["init_time"] =
-        benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["write_time"] =
-        benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-  }
-};
-
-} // namespace gluten
-
-// usage
-// ./columnar_to_row_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet
-int main(int argc, char** argv) {
-  uint32_t iterations = 1;
-  uint32_t threads = 1;
-  std::string datafile;
-  uint32_t cpu = 0xffffffff;
-
-  for (int i = 0; i < argc; i++) {
-    if (strcmp(argv[i], "--iterations") == 0) {
-      iterations = atol(argv[i + 1]);
-    } else if (strcmp(argv[i], "--threads") == 0) {
-      threads = atol(argv[i + 1]);
-    } else if (strcmp(argv[i], "--file") == 0) {
-      datafile = argv[i + 1];
-    } else if (strcmp(argv[i], "--cpu") == 0) {
-      cpu = atol(argv[i + 1]);
-    }
-  }
-  LOG(INFO) << "iterations = " << iterations;
-  LOG(INFO) << "threads = " << threads;
-  LOG(INFO) << "datafile = " << datafile;
-  LOG(INFO) << "cpu = " << cpu;
-
-  gluten::initVeloxBackend();
-  memory::MemoryManager::testingSetInstance({});
-
-  gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);
-
-  benchmark::RegisterBenchmark("GoogleBenchmarkColumnarToRow::CacheScan", bck)
-      ->Args({
-          cpu,
-      })
-      ->Iterations(iterations)
-      ->Threads(threads)
-      ->ReportAggregatesOnly(false)
-      ->MeasureProcessCPUTime()
-      ->Unit(benchmark::kSecond);
-
-  benchmark::Initialize(&argc, argv);
-  benchmark::RunSpecifiedBenchmarks();
-  benchmark::Shutdown();
-}
diff --git a/cpp/velox/benchmarks/CompressionBenchmark.cc 
b/cpp/velox/benchmarks/CompressionBenchmark.cc
deleted file mode 100644
index 8e65899b75..0000000000
--- a/cpp/velox/benchmarks/CompressionBenchmark.cc
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/extension_type.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/ipc/options.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/type.h>
-#include <arrow/type_fwd.h>
-#include <benchmark/benchmark.h>
-#include <execinfo.h>
-#include <glog/logging.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-#include <sched.h>
-
-#include <chrono>
-#include <iostream>
-#include <utility>
-
-#include "common/BenchmarkUtils.h"
-#include "shuffle/ShuffleWriter.h"
-#include "utils/Compression.h"
-#include "utils/Macros.h"
-
-void printTrace(void) {
-  char** strings;
-  size_t i, size;
-  enum Constexpr { kMaxSize = 1024 };
-  void* array[kMaxSize];
-  size = backtrace(array, kMaxSize);
-  strings = backtrace_symbols(array, size);
-  for (i = 0; i < size; i++) {
-    if (strings[i] != nullptr) {
-      printf("    %s\n", strings[i]);
-    }
-  }
-  puts("");
-  if (strings != nullptr) {
-    free(strings);
-  }
-}
-
-using arrow::RecordBatchReader;
-using arrow::Status;
-using gluten::GlutenException;
-using gluten::ShuffleWriterOptions;
-
-namespace gluten {
-
-#define ALIGNMENT (2 * 1024 * 1024)
-
-const int32_t kQatGzip = 0;
-const int32_t kQatZstd = 1;
-const int32_t kQplGzip = 2;
-const int32_t kLZ4 = 3;
-const int32_t kZstd = 4;
-
-class BenchmarkCompression {
- public:
-  explicit BenchmarkCompression(const std::string& fileName, uint32_t 
compressBufferSize) {
-    getRecordBatchReader(fileName, compressBufferSize);
-  }
-
-  virtual std::string name() const = 0;
-
-  void getRecordBatchReader(const std::string& inputFile, uint32_t 
compressBufferSize) {
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-
-    std::shared_ptr<arrow::fs::FileSystem> fs;
-    std::string fileName;
-    GLUTEN_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile, 
&fileName))
-
-    GLUTEN_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
-    properties_.set_batch_size(compressBufferSize);
-    properties_.set_pre_buffer(false);
-    properties_.set_use_threads(false);
-
-    GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    GLUTEN_THROW_NOT_OK(parquetReader->GetSchema(&schema_));
-
-    auto numRowgroups = parquetReader->num_row_groups();
-
-    for (int i = 0; i < numRowgroups; ++i) {
-      rowGroupIndices_.push_back(i);
-    }
-
-    auto numColumns = schema_->num_fields();
-    for (int i = 0; i < numColumns; ++i) {
-      columnIndices_.push_back(i);
-    }
-  }
-
-  void operator()(benchmark::State& state) {
-    setCpu(state.range(2) + state.thread_index());
-    auto ipcWriteOptions = arrow::ipc::IpcWriteOptions::Defaults();
-    ipcWriteOptions.use_threads = false;
-    auto compressBufferSize = static_cast<uint32_t>(state.range(1));
-    auto compressionType = state.range(0);
-    switch (compressionType) {
-      case gluten::kLZ4: {
-        ipcWriteOptions.codec = 
createArrowIpcCodec(arrow::Compression::LZ4_FRAME, CodecBackend::NONE);
-        break;
-      }
-      case gluten::kZstd: {
-        ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, 
CodecBackend::NONE);
-        break;
-      }
-#ifdef GLUTEN_ENABLE_QAT
-      case gluten::kQatGzip: {
-        ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::GZIP, 
CodecBackend::QAT);
-        break;
-      }
-      case gluten::kQatZstd: {
-        ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, 
CodecBackend::QAT);
-        LOG(INFO) << "load qatzstd";
-        break;
-      }
-#endif
-#ifdef GLUTEN_ENABLE_IAA
-      case gluten::kQplGzip: {
-        ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, 
CodecBackend::IAA);
-        break;
-      }
-#endif
-      default:
-        throw GlutenException("Codec not supported. Only support LZ4 or 
QATGzip");
-    }
-    ipcWriteOptions.memory_pool = arrow::default_memory_pool();
-
-    int64_t elapseRead = 0;
-    int64_t numBatches = 0;
-    int64_t numRows = 0;
-    int64_t compressTime = 0;
-    int64_t decompressTime = 0;
-    int64_t uncompressedSize = 0;
-    int64_t compressedSize = 0;
-
-    auto startTime = std::chrono::steady_clock::now();
-
-    doCompress(
-        elapseRead,
-        numBatches,
-        numRows,
-        compressTime,
-        decompressTime,
-        uncompressedSize,
-        compressedSize,
-        ipcWriteOptions,
-        state);
-    auto endTime = std::chrono::steady_clock::now();
-    auto totalTime = (endTime - startTime).count();
-    LOG(INFO) << "Thread " << state.thread_index() << " took " << (1.0 * 
totalTime / 1e9) << "s";
-
-    state.counters["rowgroups"] =
-        benchmark::Counter(rowGroupIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-
-    state.counters["columns"] =
-        benchmark::Counter(columnIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-
-    state.counters["batches"] =
-        benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["num_rows"] =
-        benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["batch_buffer_size"] =
-        benchmark::Counter(compressBufferSize, 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024);
-
-    state.counters["parquet_parse"] =
-        benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["compress_time"] =
-        benchmark::Counter(compressTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["decompress_time"] =
-        benchmark::Counter(decompressTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["total_time"] =
-        benchmark::Counter(totalTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    state.counters["uncompressed_size"] =
-        benchmark::Counter(uncompressedSize, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-
-    state.counters["compressed_size"] =
-        benchmark::Counter(compressedSize, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-
-    auto compressionRatio = 1.0 * compressedSize / uncompressedSize;
-    state.counters["compression_ratio"] =
-        benchmark::Counter(compressionRatio, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-
-    // compress_time is in nanosecond, zoom out to second.
-    auto throughput = 1.0 * uncompressedSize / compressTime * 1e9 * 8;
-    state.counters["throughput_total"] =
-        benchmark::Counter(throughput, benchmark::Counter::kDefaults, 
benchmark::Counter::OneK::kIs1024);
-
-    state.counters["throughput_per_thread"] =
-        benchmark::Counter(throughput, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-  }
-
- protected:
-  virtual void doCompress(
-      int64_t& elapseRead,
-      int64_t& numBatches,
-      int64_t& numRows,
-      int64_t& compressTime,
-      int64_t& decompressTime,
-      int64_t& uncompressedSize,
-      int64_t& compressedSize,
-      arrow::ipc::IpcWriteOptions& ipcWriteOptions,
-      benchmark::State& state) {}
-
-  void decompress(
-      const arrow::ipc::IpcWriteOptions& ipcWriteOptions,
-      const std::vector<std::shared_ptr<arrow::ipc::IpcPayload>>& payloads,
-      const std::vector<std::vector<int64_t>>& uncompressedBufferSize,
-      int64_t& decompressTime) {
-    TIME_NANO_START(decompressTime);
-    auto codec = ipcWriteOptions.codec;
-    for (auto i = 0; i < payloads.size(); ++i) {
-      auto& buffers = payloads[i]->body_buffers;
-      for (auto j = 0; j < buffers.size(); ++j) {
-        auto outputSize = uncompressedBufferSize[i][j];
-        if (buffers[j] && outputSize >= 0) {
-          GLUTEN_ASSIGN_OR_THROW(auto out, 
arrow::AllocateResizableBuffer(outputSize, ipcWriteOptions.memory_pool));
-          // Exclude the first 8-byte buffer metadata.
-          GLUTEN_ASSIGN_OR_THROW(
-              auto len,
-              codec->Decompress(buffers[j]->size() - 8, buffers[j]->data() + 
8, outputSize, out->mutable_data()));
-          static_cast<void>(len);
-        }
-      }
-    }
-    TIME_NANO_END(decompressTime);
-  }
-
- protected:
-  std::shared_ptr<arrow::io::RandomAccessFile> file_;
-  std::vector<int> rowGroupIndices_;
-  std::vector<int> columnIndices_;
-  std::shared_ptr<arrow::Schema> schema_;
-  parquet::ArrowReaderProperties properties_;
-};
-
-class BenchmarkCompressionCacheScanBenchmark final : public 
BenchmarkCompression {
- public:
-  explicit BenchmarkCompressionCacheScanBenchmark(const std::string& filename, 
uint32_t compressBufferSize)
-      : BenchmarkCompression(filename, compressBufferSize) {}
-
-  std::string name() const override {
-    return "CacheScan";
-  }
-
- protected:
-  void doCompress(
-      int64_t& elapseRead,
-      int64_t& numBatches,
-      int64_t& numRows,
-      int64_t& compressTime,
-      int64_t& decompressTime,
-      int64_t& uncompressedSize,
-      int64_t& compressedSize,
-      arrow::ipc::IpcWriteOptions& ipcWriteOptions,
-      benchmark::State& state) override {
-    std::shared_ptr<arrow::RecordBatch> recordBatch;
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
-    GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
columnIndices_, &recordBatchReader));
-    do {
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-
-      if (recordBatch) {
-        batches.push_back(recordBatch);
-        numBatches += 1;
-        numRows += recordBatch->num_rows();
-      }
-    } while (recordBatch);
-
-    LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1e6 << " 
ms ";
-    LOG(INFO) << "batches = " << numBatches << " rows = " << numRows;
-
-    std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> 
payloads(batches.size());
-    std::vector<std::vector<int64_t>> uncompressedBufferSize(batches.size());
-
-    for (auto _ : state) {
-      auto it = batches.begin();
-      auto pos = 0;
-      while (it != batches.end()) {
-        recordBatch = *it++;
-        for (auto i = 0; i < recordBatch->num_columns(); ++i) {
-          recordBatch->column(i)->data()->buffers[0] = nullptr;
-          for (auto& buffer : recordBatch->column(i)->data()->buffers) {
-            if (buffer) {
-              uncompressedBufferSize[pos].push_back(buffer->size());
-            } else {
-              uncompressedBufferSize[pos].push_back(-1);
-            }
-          }
-        }
-        auto payload = std::make_shared<arrow::ipc::IpcPayload>();
-
-        TIME_NANO_OR_THROW(
-            compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch, 
ipcWriteOptions, payload.get()));
-        uncompressedSize += payload->raw_body_length;
-        compressedSize += payload->body_length;
-        TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-        payloads[pos] = std::move(payload);
-        pos++;
-      }
-
-      decompress(ipcWriteOptions, payloads, uncompressedBufferSize, 
decompressTime);
-    }
-  }
-};
-
-class BenchmarkCompressionIterateScanBenchmark final : public 
BenchmarkCompression {
- public:
-  explicit BenchmarkCompressionIterateScanBenchmark(const std::string& 
filename, uint32_t compressBufferSize)
-      : BenchmarkCompression(filename, compressBufferSize) {}
-
-  std::string name() const override {
-    return "IterateScan";
-  }
-
- protected:
-  void doCompress(
-      int64_t& elapseRead,
-      int64_t& numBatches,
-      int64_t& numRows,
-      int64_t& compressTime,
-      int64_t& decompressTime,
-      int64_t& uncompressedSize,
-      int64_t& compressedSize,
-      arrow::ipc::IpcWriteOptions& ipcWriteOptions,
-      benchmark::State& state) override {
-    std::shared_ptr<arrow::RecordBatch> recordBatch;
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    for (auto _ : state) {
-      
GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
columnIndices_, &recordBatchReader));
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-      std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads;
-      std::vector<std::vector<int64_t>> uncompressedBufferSize;
-      while (recordBatch) {
-        numBatches += 1;
-        uncompressedBufferSize.resize(numBatches);
-
-        numRows += recordBatch->num_rows();
-        for (auto i = 0; i < recordBatch->num_columns(); ++i) {
-          recordBatch->column(i)->data()->buffers[0] = nullptr;
-          for (auto& buffer : recordBatch->column(i)->data()->buffers) {
-            if (buffer) {
-              uncompressedBufferSize.back().push_back(buffer->size());
-            } else {
-              uncompressedBufferSize.back().push_back(-1);
-            }
-          }
-        }
-        auto payload = std::make_shared<arrow::ipc::IpcPayload>();
-
-        TIME_NANO_OR_THROW(
-            compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch, 
ipcWriteOptions, payload.get()));
-        uncompressedSize += payload->raw_body_length;
-        compressedSize += payload->body_length;
-        TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-        payloads.push_back(std::move(payload));
-      }
-
-      decompress(ipcWriteOptions, payloads, uncompressedBufferSize, 
decompressTime);
-    }
-  }
-};
-
-} // namespace gluten
-
-int main(int argc, char** argv) {
-  uint32_t iterations = 1;
-  uint32_t threads = 1;
-  uint32_t cpuOffset = 0;
-  std::string datafile;
-  auto codec = gluten::kLZ4;
-  uint32_t compressBufferSize = 4096;
-
-  for (int i = 0; i < argc; i++) {
-    if (strcmp(argv[i], "--iterations") == 0) {
-      iterations = atol(argv[i + 1]);
-    } else if (strcmp(argv[i], "--threads") == 0) {
-      threads = atol(argv[i + 1]);
-    } else if (strcmp(argv[i], "--file") == 0) {
-      datafile = argv[i + 1];
-    } else if (strcmp(argv[i], "--qat-gzip") == 0) {
-      LOG(INFO) << "QAT gzip is used as codec";
-      codec = gluten::kQatGzip;
-    } else if (strcmp(argv[i], "--qat-zstd") == 0) {
-      LOG(INFO) << "QAT zstd is used as codec";
-      codec = gluten::kQatZstd;
-    } else if (strcmp(argv[i], "--qpl-gzip") == 0) {
-      LOG(INFO) << "QPL gzip is used as codec";
-      codec = gluten::kQplGzip;
-    } else if (strcmp(argv[i], "--zstd") == 0) {
-      LOG(INFO) << "CPU zstd is used as codec";
-      codec = gluten::kZstd;
-    } else if (strcmp(argv[i], "--buffer-size") == 0) {
-      compressBufferSize = atol(argv[i + 1]);
-    } else if (strcmp(argv[i], "--cpu-offset") == 0) {
-      cpuOffset = atol(argv[i + 1]);
-    }
-  }
-  LOG(INFO) << "iterations = " << iterations;
-  LOG(INFO) << "threads = " << threads;
-  LOG(INFO) << "datafile = " << datafile;
-
-  gluten::BenchmarkCompressionIterateScanBenchmark bmIterateScan(datafile, 
compressBufferSize);
-  gluten::BenchmarkCompressionCacheScanBenchmark bmCacheScan(datafile, 
compressBufferSize);
-
-  benchmark::RegisterBenchmark(bmIterateScan.name().c_str(), bmIterateScan)
-      ->Iterations(iterations)
-      ->Args({
-          codec,
-          compressBufferSize,
-          cpuOffset,
-      })
-      ->Threads(threads)
-      ->ReportAggregatesOnly(false)
-      ->MeasureProcessCPUTime()
-      ->Unit(benchmark::kSecond);
-
-  benchmark::RegisterBenchmark(bmCacheScan.name().c_str(), bmCacheScan)
-      ->Iterations(iterations)
-      ->Args({
-          codec,
-          compressBufferSize,
-          cpuOffset,
-      })
-      ->Threads(threads)
-      ->ReportAggregatesOnly(false)
-      ->MeasureProcessCPUTime()
-      ->Unit(benchmark::kSecond);
-
-  benchmark::Initialize(&argc, argv);
-  benchmark::RunSpecifiedBenchmarks();
-  benchmark::Shutdown();
-}
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc 
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index 81be604eaf..8edd515f08 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -15,104 +15,25 @@
  * limitations under the License.
  */
 
-#include <arrow/c/abi.h>
-#include <arrow/c/bridge.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/file.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/table.h>
-#include <arrow/type.h>
-#include <arrow/util/io_util.h>
 #include <benchmark/benchmark.h>
-#include <gtest/gtest.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-#include <parquet/properties.h>
-
-#include <chrono>
 
 #include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/Runtime.h"
 #include "compute/VeloxBackend.h"
-#include "compute/VeloxRuntime.h"
-#include "memory/ArrowMemoryPool.h"
-#include "memory/ColumnarBatch.h"
-#include "utils/Macros.h"
-#include "utils/TestUtils.h"
+#include "memory/VeloxMemoryManager.h"
+#include "operators/reader/ParquetReaderIterator.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "utils/VeloxArrowUtils.h"
-#include "velox/dwio/parquet/writer/Writer.h"
-#include "velox/vector/arrow/Bridge.h"
 
-using namespace facebook;
-using namespace arrow;
 namespace gluten {
 
 const int kBatchBufferSize = 32768;
 
-class GoogleBenchmarkParquetWrite {
+class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark {
  public:
-  GoogleBenchmarkParquetWrite(std::string fileName, std::string outputPath)
-      : fileName_(fileName), outputPath_(outputPath) {
-    getRecordBatchReader(fileName);
-  }
-
-  void getRecordBatchReader(const std::string& inputFile) {
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-
-    std::shared_ptr<arrow::fs::FileSystem> fs;
-    std::string fileName;
-    ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile, 
&fileName))
-
-    ARROW_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
-    properties_.set_batch_size(kBatchBufferSize);
-    properties_.set_pre_buffer(false);
-    properties_.set_use_threads(false);
-
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    ASSERT_NOT_OK(parquetReader->GetSchema(&schema_));
-
-    auto numRowgroups = parquetReader->num_row_groups();
-
-    for (int i = 0; i < numRowgroups; ++i) {
-      rowGroupIndices_.push_back(i);
-    }
+  GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(const std::string& 
fileName, const std::string& outputPath)
+      : fileName_(fileName), outputPath_(outputPath) {}
 
-    auto numColumns = schema_->num_fields();
-    for (int i = 0; i < numColumns; ++i) {
-      columnIndices_.push_back(i);
-    }
-  }
-
-  virtual void operator()(benchmark::State& state) {}
-
- protected:
-  std::shared_ptr<ColumnarBatch> recordBatch2VeloxColumnarBatch(const 
arrow::RecordBatch& rb) {
-    ArrowArray arrowArray;
-    ArrowSchema arrowSchema;
-    ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
-    auto vp = velox::importFromArrowAsOwner(arrowSchema, arrowArray, 
gluten::defaultLeafVeloxMemoryPool().get());
-    return 
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
-  }
-
- protected:
-  std::string fileName_;
-  std::string outputPath_;
-  std::shared_ptr<arrow::io::RandomAccessFile> file_;
-  std::vector<int> rowGroupIndices_;
-  std::vector<int> columnIndices_;
-  std::shared_ptr<arrow::Schema> schema_;
-  ::parquet::ArrowReaderProperties properties_;
-};
-
-class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark : public 
GoogleBenchmarkParquetWrite {
- public:
-  GoogleBenchmarkArrowParquetWriteCacheScanBenchmark(std::string fileName, 
std::string outputPath)
-      : GoogleBenchmarkParquetWrite(fileName, outputPath) {}
   void operator()(benchmark::State& state) {
     if (state.range(0) == 0xffffffff) {
       setCpu(state.thread_index());
@@ -124,158 +45,45 @@ class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark 
: public GoogleBenchmar
     int64_t elapseRead = 0;
     int64_t numBatches = 0;
     int64_t numRows = 0;
+    int64_t numColumns = 0;
     int64_t initTime = 0;
     int64_t writeTime = 0;
 
-    std::vector<int> localColumnIndices = columnIndices_;
-
-    std::shared_ptr<arrow::Schema> localSchema;
-    localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
-    if (state.thread_index() == 0)
-      LOG(INFO) << localSchema->ToString();
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        ::arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    std::vector<std::shared_ptr<RecordBatch>> vectors;
-    ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
localColumnIndices, &recordBatchReader));
-    do {
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-
-      if (recordBatch) {
-        vectors.push_back(recordBatch);
-        numBatches += 1;
-        numRows += recordBatch->num_rows();
-      }
-    } while (recordBatch);
-
-    LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 
<< " rows = " << numRows;
-
     // reuse the ParquetWriteConverter for batches caused system % increase a 
lot
-    auto fileName = "arrow_parquet_write.parquet";
-
-    for (auto _ : state) {
-      // Choose compression
-      std::shared_ptr<::parquet::WriterProperties> props =
-          
::parquet::WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
-
-      // Opt to store Arrow schema for easier reads back into Arrow
-      std::shared_ptr<::parquet::ArrowWriterProperties> arrow_props =
-          ::parquet::ArrowWriterProperties::Builder().store_schema()->build();
-
-      std::shared_ptr<arrow::io::FileOutputStream> outfile;
-      outfile = arrow::io::FileOutputStream::Open(outputPath_ + 
fileName).ValueOrDie();
-      std::unique_ptr<::parquet::arrow::FileWriter> arrowWriter;
-
-      arrowWriter =
-          ::parquet::arrow::FileWriter::Open(*localSchema, 
arrow::default_memory_pool(), outfile, props, arrow_props)
-              .ValueOrDie();
-      auto start = std::chrono::steady_clock::now();
-      for (const auto& vector : vectors) {
-        auto table = arrow::Table::Make(vector->schema(), vector->columns(), 
vector->num_rows());
-        PARQUET_THROW_NOT_OK(arrowWriter->WriteTable(*table, 10000));
-      }
-      auto end = std::chrono::steady_clock::now();
-      writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
-    }
-
-    state.counters["rowgroups"] =
-        benchmark::Counter(rowGroupIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["columns"] =
-        benchmark::Counter(columnIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-    state.counters["batches"] =
-        benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["num_rows"] =
-        benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["batch_buffer_size"] =
-        benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1024);
-
-    state.counters["parquet_parse"] =
-        benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["init_time"] =
-        benchmark::Counter(initTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-    state.counters["write_time"] =
-        benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
-  }
-};
-
-class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public 
GoogleBenchmarkParquetWrite {
- public:
-  GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(std::string fileName, 
std::string outputPath)
-      : GoogleBenchmarkParquetWrite(fileName, outputPath) {}
-  void operator()(benchmark::State& state) {
-    if (state.range(0) == 0xffffffff) {
-      setCpu(state.thread_index());
-    } else {
-      setCpu(state.range(0));
-    }
-
-    std::shared_ptr<arrow::RecordBatch> recordBatch;
-    int64_t elapseRead = 0;
-    int64_t numBatches = 0;
-    int64_t numRows = 0;
-    int64_t initTime = 0;
-    int64_t writeTime = 0;
-
-    std::vector<int> localColumnIndices = columnIndices_;
-
-    std::shared_ptr<arrow::Schema> localSchema;
-    localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
-    if (state.thread_index() == 0)
-      LOG(INFO) << localSchema->ToString();
-
-    std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
-    std::shared_ptr<RecordBatchReader> recordBatchReader;
-    ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
-        ::arrow::default_memory_pool(), 
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
-    std::vector<std::shared_ptr<gluten::ColumnarBatch>> vectors;
-    ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, 
localColumnIndices, &recordBatchReader));
-    do {
-      TIME_NANO_OR_THROW(elapseRead, 
recordBatchReader->ReadNext(&recordBatch));
-
-      if (recordBatch) {
-        vectors.push_back(recordBatch2VeloxColumnarBatch(*recordBatch));
-        numBatches += 1;
-        numRows += recordBatch->num_rows();
-      }
-    } while (recordBatch);
-
-    LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000 
<< " rows = " << numRows;
-
-    // reuse the ParquetWriteConverter for batches caused system % increase a 
lot
-    auto fileName = "velox_parquet_write.parquet";
 
     auto memoryManager = getDefaultMemoryManager();
     auto runtime = Runtime::create(kVeloxBackendKind, memoryManager);
     auto veloxPool = memoryManager->getAggregateMemoryPool();
 
     for (auto _ : state) {
+      const auto output = "velox_parquet_write.parquet";
+
       // Init VeloxParquetDataSource
+      auto reader = [&] {
+        ScopedTimer timer(&elapseRead);
+        return std::make_unique<ParquetBufferedReaderIterator>(fileName_, 
kBatchBufferSize, veloxPool.get());
+      }();
+
+      const auto localSchema = toArrowSchema(reader->getRowType(), 
veloxPool.get());
+
       auto veloxParquetDataSource = 
std::make_unique<gluten::VeloxParquetDataSource>(
-          outputPath_ + "/" + fileName,
+          outputPath_ + "/" + output,
           veloxPool->addAggregateChild("writer_benchmark"),
           veloxPool->addLeafChild("sink_pool"),
           localSchema);
 
       veloxParquetDataSource->init(runtime->getConfMap());
-      auto start = std::chrono::steady_clock::now();
-      for (const auto& vector : vectors) {
-        veloxParquetDataSource->write(vector);
+
+      while (auto batch = reader->next()) {
+        ScopedTimer timer(&elapseRead);
+        veloxParquetDataSource->write(batch);
       }
-      auto end = std::chrono::steady_clock::now();
-      writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
+
       veloxParquetDataSource->close();
     }
 
-    state.counters["rowgroups"] =
-        benchmark::Counter(rowGroupIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
     state.counters["columns"] =
-        benchmark::Counter(columnIndices_.size(), 
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
+        benchmark::Counter(numColumns, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
     state.counters["batches"] =
         benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
     state.counters["num_rows"] =
@@ -291,6 +99,10 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : 
public GoogleBenchmar
         benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, 
benchmark::Counter::OneK::kIs1000);
     Runtime::release(runtime);
   }
+
+ private:
+  std::string fileName_;
+  std::string outputPath_;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/operators/reader/FileReaderIterator.cc 
b/cpp/velox/operators/reader/FileReaderIterator.cc
index 49d52f5ffd..7b2a04e360 100644
--- a/cpp/velox/operators/reader/FileReaderIterator.cc
+++ b/cpp/velox/operators/reader/FileReaderIterator.cc
@@ -24,12 +24,6 @@ namespace {
 const std::string kParquetSuffix = ".parquet";
 }
 
-FileReaderIterator::FileReaderIterator(const std::string& path) : path_(path) 
{}
-
-int64_t FileReaderIterator::getCollectBatchTime() const {
-  return collectBatchTime_;
-}
-
 std::shared_ptr<gluten::ResultIterator> 
FileReaderIterator::getInputIteratorFromFileReader(
     FileReaderType readerType,
     const std::string& path,
diff --git a/cpp/velox/operators/reader/FileReaderIterator.h 
b/cpp/velox/operators/reader/FileReaderIterator.h
index 708120603e..ad22148e55 100644
--- a/cpp/velox/operators/reader/FileReaderIterator.h
+++ b/cpp/velox/operators/reader/FileReaderIterator.h
@@ -33,13 +33,13 @@ class FileReaderIterator : public ColumnarBatchIterator {
       int64_t batchSize,
       facebook::velox::memory::MemoryPool* pool);
 
-  explicit FileReaderIterator(const std::string& path);
+  explicit FileReaderIterator(const std::string& path) : path_(path){};
 
   virtual ~FileReaderIterator() = default;
 
-  virtual std::shared_ptr<arrow::Schema> getSchema() = 0;
-
-  int64_t getCollectBatchTime() const;
+  int64_t getCollectBatchTime() const {
+    return collectBatchTime_;
+  }
 
  protected:
   int64_t collectBatchTime_ = 0;
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.cc 
b/cpp/velox/operators/reader/ParquetReaderIterator.cc
index 014bccbfce..223d1c0029 100644
--- a/cpp/velox/operators/reader/ParquetReaderIterator.cc
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.cc
@@ -17,31 +17,44 @@
 
 #include "operators/reader/ParquetReaderIterator.h"
 #include "memory/VeloxColumnarBatch.h"
-
-#include <arrow/util/range.h>
+#include "utils/Timer.h"
 
 namespace gluten {
+namespace {
+std::unique_ptr<facebook::velox::parquet::ParquetReader> createReader(
+    const std::string& path,
+    const facebook::velox::dwio::common::ReaderOptions& opts) {
+  auto input = std::make_unique<facebook::velox::dwio::common::BufferedInput>(
+      std::make_shared<facebook::velox::LocalReadFile>(path), 
opts.memoryPool());
+  return 
std::make_unique<facebook::velox::parquet::ParquetReader>(std::move(input), 
opts);
+}
+
+std::shared_ptr<facebook::velox::common::ScanSpec> makeScanSpec(const 
facebook::velox::RowTypePtr& rowType) {
+  auto scanSpec = std::make_shared<facebook::velox::common::ScanSpec>("");
+  scanSpec->addAllChildFields(*rowType);
+  return scanSpec;
+}
+} // namespace
 
 ParquetReaderIterator::ParquetReaderIterator(
     const std::string& path,
     int64_t batchSize,
     facebook::velox::memory::MemoryPool* pool)
-    : FileReaderIterator(path), batchSize_(batchSize), pool_(pool) {}
-
-void ParquetReaderIterator::createReader() {
-  parquet::ArrowReaderProperties properties = 
parquet::default_arrow_reader_properties();
-  properties.set_batch_size(batchSize_);
-  GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
-      arrow::default_memory_pool(), 
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
-  GLUTEN_THROW_NOT_OK(
-      
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
 &recordBatchReader_));
-
-  auto schema = recordBatchReader_->schema();
-  DLOG(INFO) << "Schema:\n" << schema->ToString();
-}
+    : FileReaderIterator(path), pool_(pool), batchSize_(batchSize) {}
+
+void ParquetReaderIterator::createRowReader() {
+  facebook::velox::dwio::common::ReaderOptions readerOptions{pool_};
+  auto reader = createReader(path_, readerOptions);
+
+  rowType_ = reader->rowType();
 
-std::shared_ptr<arrow::Schema> ParquetReaderIterator::getSchema() {
-  return recordBatchReader_->schema();
+  facebook::velox::dwio::common::RowReaderOptions rowReaderOpts;
+  
rowReaderOpts.select(std::make_shared<facebook::velox::dwio::common::ColumnSelector>(rowType_,
 rowType_->names()));
+  rowReaderOpts.setScanSpec(makeScanSpec(rowType_));
+
+  rowReader_ = reader->createRowReader(rowReaderOpts);
+
+  DLOG(INFO) << "Opened file for read: " << path_;
 }
 
 ParquetStreamReaderIterator::ParquetStreamReaderIterator(
@@ -49,20 +62,30 @@ ParquetStreamReaderIterator::ParquetStreamReaderIterator(
     int64_t batchSize,
     facebook::velox::memory::MemoryPool* pool)
     : ParquetReaderIterator(path, batchSize, pool) {
-  createReader();
-  DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
+  createRowReader();
 }
 
 std::shared_ptr<gluten::ColumnarBatch> ParquetStreamReaderIterator::next() {
-  auto startTime = std::chrono::steady_clock::now();
-  GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
-  DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " << 
(batch ? batch->num_rows() : 0);
-  collectBatchTime_ +=
-      
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
 - startTime).count();
-  if (batch == nullptr) {
+  ScopedTimer timer(&collectBatchTime_);
+
+  static constexpr int32_t kBatchSize = 4096;
+
+  auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize, 
pool_);
+  auto numRows = rowReader_->next(kBatchSize, result);
+
+  if (numRows == 0) {
     return nullptr;
   }
-  return VeloxColumnarBatch::from(pool_, 
std::make_shared<gluten::ArrowColumnarBatch>(batch));
+
+  // Load lazy vector.
+  result = facebook::velox::BaseVector::loadedVectorShared(result);
+
+  auto rowVector = 
std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
+  GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
+
+  DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
+
+  return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
 ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
@@ -70,29 +93,44 @@ 
ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
     int64_t batchSize,
     facebook::velox::memory::MemoryPool* pool)
     : ParquetReaderIterator(path, batchSize, pool) {
-  createReader();
+  createRowReader();
   collectBatches();
-  iter_ = batches_.begin();
-  DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
-  DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
-  if (iter_ != batches_.cend()) {
-    DLOG(INFO) << "columns: " << (*iter_)->num_columns();
-    DLOG(INFO) << "rows: " << (*iter_)->num_rows();
-  }
 }
 
 std::shared_ptr<gluten::ColumnarBatch> ParquetBufferedReaderIterator::next() {
   if (iter_ == batches_.cend()) {
     return nullptr;
   }
-  return VeloxColumnarBatch::from(pool_, 
std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
+  return *iter_++;
 }
 
 void ParquetBufferedReaderIterator::collectBatches() {
-  auto startTime = std::chrono::steady_clock::now();
-  GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
-  auto endTime = std::chrono::steady_clock::now();
-  collectBatchTime_ += 
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - 
startTime).count();
+  ScopedTimer timer(&collectBatchTime_);
+
+  static constexpr int32_t kBatchSize = 4096;
+
+  uint64_t numRows = 0;
+  while (true) {
+    auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize, 
pool_);
+    numRows = rowReader_->next(kBatchSize, result);
+    if (numRows == 0) {
+      break;
+    }
+
+    // Load lazy vector.
+    result = facebook::velox::BaseVector::loadedVectorShared(result);
+
+    auto rowVector = 
std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
+    GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
+
+    DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
+
+    
batches_.push_back(std::make_shared<VeloxColumnarBatch>(std::move(rowVector)));
+  }
+
+  iter_ = batches_.begin();
+
+  DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
 }
 
 } // namespace gluten
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.h 
b/cpp/velox/operators/reader/ParquetReaderIterator.h
index f45fe5eb77..85449980e6 100644
--- a/cpp/velox/operators/reader/ParquetReaderIterator.h
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
+#include "memory/VeloxColumnarBatch.h"
 #include "operators/reader/FileReaderIterator.h"
 
-#include <parquet/arrow/reader.h>
+#include "velox/dwio/parquet/reader/ParquetReader.h"
+
 #include <memory>
 
 namespace gluten {
@@ -28,22 +30,26 @@ class ParquetReaderIterator : public FileReaderIterator {
  public:
   explicit ParquetReaderIterator(const std::string& path, int64_t batchSize, 
facebook::velox::memory::MemoryPool* pool);
 
-  void createReader();
-
-  std::shared_ptr<arrow::Schema> getSchema() override;
+  facebook::velox::RowTypePtr getRowType() const {
+    return rowType_;
+  }
 
  protected:
-  std::unique_ptr<::parquet::arrow::FileReader> fileReader_;
-  std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
-  int64_t batchSize_;
+  void createRowReader();
+
   facebook::velox::memory::MemoryPool* pool_;
+
+  facebook::velox::RowTypePtr rowType_;
+  std::unique_ptr<facebook::velox::dwio::common::RowReader> rowReader_;
+
+  int64_t batchSize_;
 };
 
 class ParquetStreamReaderIterator final : public ParquetReaderIterator {
  public:
   ParquetStreamReaderIterator(const std::string& path, int64_t batchSize, 
facebook::velox::memory::MemoryPool* pool);
 
-  std::shared_ptr<gluten::ColumnarBatch> next() override;
+  std::shared_ptr<ColumnarBatch> next() override;
 };
 
 class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
@@ -53,13 +59,13 @@ class ParquetBufferedReaderIterator final : public 
ParquetReaderIterator {
       int64_t batchSize,
       facebook::velox::memory::MemoryPool* pool);
 
-  std::shared_ptr<gluten::ColumnarBatch> next() override;
+  std::shared_ptr<ColumnarBatch> next() override;
 
  private:
   void collectBatches();
 
-  arrow::RecordBatchVector batches_;
-  std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
+  std::vector<std::shared_ptr<VeloxColumnarBatch>> batches_;
+  std::vector<std::shared_ptr<VeloxColumnarBatch>>::const_iterator iter_;
 };
 
 } // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to