This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f6623929a8 [GLUTEN-9468][VL] Remove parquet-arrow reader dependency in
benchmarks (#9469)
f6623929a8 is described below
commit f6623929a815dbea5bcdef5b945f1c4217e71f0a
Author: Rong Ma <[email protected]>
AuthorDate: Wed Apr 30 17:03:59 2025 +0100
[GLUTEN-9468][VL] Remove parquet-arrow reader dependency in benchmarks
(#9469)
---
cpp/velox/benchmarks/CMakeLists.txt | 4 -
cpp/velox/benchmarks/ColumnarToRowBenchmark.cc | 282 ------------
cpp/velox/benchmarks/CompressionBenchmark.cc | 472 ---------------------
cpp/velox/benchmarks/ParquetWriteBenchmark.cc | 244 ++---------
cpp/velox/operators/reader/FileReaderIterator.cc | 6 -
cpp/velox/operators/reader/FileReaderIterator.h | 8 +-
.../operators/reader/ParquetReaderIterator.cc | 116 +++--
cpp/velox/operators/reader/ParquetReaderIterator.h | 28 +-
8 files changed, 126 insertions(+), 1034 deletions(-)
diff --git a/cpp/velox/benchmarks/CMakeLists.txt
b/cpp/velox/benchmarks/CMakeLists.txt
index aeb43518e8..20c8bf27d1 100644
--- a/cpp/velox/benchmarks/CMakeLists.txt
+++ b/cpp/velox/benchmarks/CMakeLists.txt
@@ -32,10 +32,6 @@ endfunction()
# Generic benchmark
add_velox_benchmark(generic_benchmark GenericBenchmark.cc)
-add_velox_benchmark(columnar_to_row_benchmark ColumnarToRowBenchmark.cc)
-
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)
add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)
-
-add_velox_benchmark(compression_benchmark CompressionBenchmark.cc)
diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
deleted file mode 100644
index e32e6d7513..0000000000
--- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/c/abi.h>
-#include <arrow/c/bridge.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/type.h>
-#include <arrow/util/io_util.h>
-#include <benchmark/benchmark.h>
-#include <gtest/gtest.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-
-#include <chrono>
-
-#include "benchmarks/common/BenchmarkUtils.h"
-#include "memory/ArrowMemoryPool.h"
-#include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
-#include "operators/serializer/VeloxColumnarToRowConverter.h"
-#include "utils/Macros.h"
-#include "utils/TestUtils.h"
-#include "utils/VeloxArrowUtils.h"
-#include "velox/vector/arrow/Bridge.h"
-
-using namespace facebook;
-using namespace arrow;
-namespace gluten {
-
-const int kBatchBufferSize = 4096;
-
-class GoogleBenchmarkColumnarToRow {
- public:
- GoogleBenchmarkColumnarToRow(std::string fileName) {
- getRecordBatchReader(fileName);
- }
-
- void getRecordBatchReader(const std::string& inputFile) {
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
-
- std::shared_ptr<arrow::fs::FileSystem> fs;
- std::string fileName;
- ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile,
&fileName))
-
- ARROW_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
- properties_.set_batch_size(kBatchBufferSize);
- properties_.set_pre_buffer(false);
- properties_.set_use_threads(false);
-
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- ASSERT_NOT_OK(parquetReader->GetSchema(&schema_));
-
- auto numRowgroups = parquetReader->num_row_groups();
-
- for (int i = 0; i < numRowgroups; ++i) {
- rowGroupIndices_.push_back(i);
- }
-
- auto numColumns = schema_->num_fields();
- for (int i = 0; i < numColumns; ++i) {
- columnIndices_.push_back(i);
- }
- }
-
- virtual void operator()(benchmark::State& state) {}
-
- protected:
- velox::VectorPtr recordBatch2RowVector(const arrow::RecordBatch& rb) {
- ArrowArray arrowArray;
- ArrowSchema arrowSchema;
- ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
- return velox::importFromArrowAsOwner(arrowSchema, arrowArray,
gluten::defaultLeafVeloxMemoryPool().get());
- }
-
- protected:
- std::string fileName_;
- std::shared_ptr<arrow::io::RandomAccessFile> file_;
- std::vector<int> rowGroupIndices_;
- std::vector<int> columnIndices_;
- std::shared_ptr<arrow::Schema> schema_;
- parquet::ArrowReaderProperties properties_;
-};
-class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public
GoogleBenchmarkColumnarToRow {
- public:
- GoogleBenchmarkColumnarToRowCacheScanBenchmark(std::string filename) :
GoogleBenchmarkColumnarToRow(filename) {}
- void operator()(benchmark::State& state) {
- if (state.range(0) == 0xffffffff) {
- setCpu(state.thread_index());
- } else {
- setCpu(state.range(0));
- }
-
- std::shared_ptr<arrow::RecordBatch> recordBatch;
- int64_t elapseRead = 0;
- int64_t numBatches = 0;
- int64_t numRows = 0;
- int64_t initTime = 0;
- int64_t writeTime = 0;
-
- std::vector<int> localColumnIndices = columnIndices_;
-
- std::shared_ptr<arrow::Schema> localSchema;
- localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
- if (state.thread_index() == 0)
- LOG(INFO) << localSchema->ToString();
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- ::arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- std::vector<velox::VectorPtr> vectors;
- ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
localColumnIndices, &recordBatchReader));
- do {
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
-
- if (recordBatch) {
- vectors.push_back(recordBatch2RowVector(*recordBatch));
- numBatches += 1;
- numRows += recordBatch->num_rows();
- }
- } while (recordBatch);
-
- LOG(WARNING) << " parquet parse done elapsed time = " << elapseRead /
1000000 << " rows = " << numRows;
-
- // reuse the columnarToRowConverter for batches caused system % increase a
lot
- auto ctxPool = defaultLeafVeloxMemoryPool();
- for (auto _ : state) {
- for (const auto& vector : vectors) {
- auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
- auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
- auto cb = std::make_shared<VeloxColumnarBatch>(row);
- TIME_NANO_START(writeTime);
- columnarToRowConverter->convert(cb);
- TIME_NANO_END(writeTime);
- }
- }
-
- state.counters["rowgroups"] =
- benchmark::Counter(rowGroupIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["columns"] =
- benchmark::Counter(columnIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["batches"] =
- benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["num_rows"] =
- benchmark::Counter(numRows, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["batch_buffer_size"] =
- benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
-
- state.counters["parquet_parse"] =
- benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["init_time"] =
- benchmark::Counter(initTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["write_time"] =
- benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- }
-};
-
-class GoogleBenchmarkColumnarToRowIterateScanBenchmark : public
GoogleBenchmarkColumnarToRow {
- public:
- GoogleBenchmarkColumnarToRowIterateScanBenchmark(std::string filename) :
GoogleBenchmarkColumnarToRow(filename) {}
- void operator()(benchmark::State& state) {
- setCpu(state.thread_index());
-
- int64_t elapseRead = 0;
- int64_t numBatches = 0;
- int64_t numRows = 0;
- int64_t initTime = 0;
- int64_t writeTime = 0;
-
- std::shared_ptr<arrow::RecordBatch> recordBatch;
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- auto ctxPool = defaultLeafVeloxMemoryPool();
- for (auto _ : state) {
- ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
columnIndices_, &recordBatchReader));
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
- while (recordBatch) {
- numBatches += 1;
- numRows += recordBatch->num_rows();
- auto vector = recordBatch2RowVector(*recordBatch);
- auto columnarToRowConverter =
std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
- auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
- auto cb = std::make_shared<VeloxColumnarBatch>(row);
- TIME_NANO_START(writeTime);
- columnarToRowConverter->convert(cb);
- TIME_NANO_END(writeTime);
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
- }
- }
-
- state.counters["rowgroups"] =
- benchmark::Counter(rowGroupIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["columns"] =
- benchmark::Counter(columnIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["batches"] =
- benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["num_rows"] =
- benchmark::Counter(numRows, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["batch_buffer_size"] =
- benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
-
- state.counters["parquet_parse"] =
- benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["init_time"] =
- benchmark::Counter(initTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["write_time"] =
- benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- }
-};
-
-} // namespace gluten
-
-// usage
-// ./columnar_to_row_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet
-int main(int argc, char** argv) {
- uint32_t iterations = 1;
- uint32_t threads = 1;
- std::string datafile;
- uint32_t cpu = 0xffffffff;
-
- for (int i = 0; i < argc; i++) {
- if (strcmp(argv[i], "--iterations") == 0) {
- iterations = atol(argv[i + 1]);
- } else if (strcmp(argv[i], "--threads") == 0) {
- threads = atol(argv[i + 1]);
- } else if (strcmp(argv[i], "--file") == 0) {
- datafile = argv[i + 1];
- } else if (strcmp(argv[i], "--cpu") == 0) {
- cpu = atol(argv[i + 1]);
- }
- }
- LOG(INFO) << "iterations = " << iterations;
- LOG(INFO) << "threads = " << threads;
- LOG(INFO) << "datafile = " << datafile;
- LOG(INFO) << "cpu = " << cpu;
-
- gluten::initVeloxBackend();
- memory::MemoryManager::testingSetInstance({});
-
- gluten::GoogleBenchmarkColumnarToRowCacheScanBenchmark bck(datafile);
-
- benchmark::RegisterBenchmark("GoogleBenchmarkColumnarToRow::CacheScan", bck)
- ->Args({
- cpu,
- })
- ->Iterations(iterations)
- ->Threads(threads)
- ->ReportAggregatesOnly(false)
- ->MeasureProcessCPUTime()
- ->Unit(benchmark::kSecond);
-
- benchmark::Initialize(&argc, argv);
- benchmark::RunSpecifiedBenchmarks();
- benchmark::Shutdown();
-}
diff --git a/cpp/velox/benchmarks/CompressionBenchmark.cc
b/cpp/velox/benchmarks/CompressionBenchmark.cc
deleted file mode 100644
index 8e65899b75..0000000000
--- a/cpp/velox/benchmarks/CompressionBenchmark.cc
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <arrow/extension_type.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/ipc/options.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/type.h>
-#include <arrow/type_fwd.h>
-#include <benchmark/benchmark.h>
-#include <execinfo.h>
-#include <glog/logging.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-#include <sched.h>
-
-#include <chrono>
-#include <iostream>
-#include <utility>
-
-#include "common/BenchmarkUtils.h"
-#include "shuffle/ShuffleWriter.h"
-#include "utils/Compression.h"
-#include "utils/Macros.h"
-
-void printTrace(void) {
- char** strings;
- size_t i, size;
- enum Constexpr { kMaxSize = 1024 };
- void* array[kMaxSize];
- size = backtrace(array, kMaxSize);
- strings = backtrace_symbols(array, size);
- for (i = 0; i < size; i++) {
- if (strings[i] != nullptr) {
- printf(" %s\n", strings[i]);
- }
- }
- puts("");
- if (strings != nullptr) {
- free(strings);
- }
-}
-
-using arrow::RecordBatchReader;
-using arrow::Status;
-using gluten::GlutenException;
-using gluten::ShuffleWriterOptions;
-
-namespace gluten {
-
-#define ALIGNMENT (2 * 1024 * 1024)
-
-const int32_t kQatGzip = 0;
-const int32_t kQatZstd = 1;
-const int32_t kQplGzip = 2;
-const int32_t kLZ4 = 3;
-const int32_t kZstd = 4;
-
-class BenchmarkCompression {
- public:
- explicit BenchmarkCompression(const std::string& fileName, uint32_t
compressBufferSize) {
- getRecordBatchReader(fileName, compressBufferSize);
- }
-
- virtual std::string name() const = 0;
-
- void getRecordBatchReader(const std::string& inputFile, uint32_t
compressBufferSize) {
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
-
- std::shared_ptr<arrow::fs::FileSystem> fs;
- std::string fileName;
- GLUTEN_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile,
&fileName))
-
- GLUTEN_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
- properties_.set_batch_size(compressBufferSize);
- properties_.set_pre_buffer(false);
- properties_.set_use_threads(false);
-
- GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- GLUTEN_THROW_NOT_OK(parquetReader->GetSchema(&schema_));
-
- auto numRowgroups = parquetReader->num_row_groups();
-
- for (int i = 0; i < numRowgroups; ++i) {
- rowGroupIndices_.push_back(i);
- }
-
- auto numColumns = schema_->num_fields();
- for (int i = 0; i < numColumns; ++i) {
- columnIndices_.push_back(i);
- }
- }
-
- void operator()(benchmark::State& state) {
- setCpu(state.range(2) + state.thread_index());
- auto ipcWriteOptions = arrow::ipc::IpcWriteOptions::Defaults();
- ipcWriteOptions.use_threads = false;
- auto compressBufferSize = static_cast<uint32_t>(state.range(1));
- auto compressionType = state.range(0);
- switch (compressionType) {
- case gluten::kLZ4: {
- ipcWriteOptions.codec =
createArrowIpcCodec(arrow::Compression::LZ4_FRAME, CodecBackend::NONE);
- break;
- }
- case gluten::kZstd: {
- ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD,
CodecBackend::NONE);
- break;
- }
-#ifdef GLUTEN_ENABLE_QAT
- case gluten::kQatGzip: {
- ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::GZIP,
CodecBackend::QAT);
- break;
- }
- case gluten::kQatZstd: {
- ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD,
CodecBackend::QAT);
- LOG(INFO) << "load qatzstd";
- break;
- }
-#endif
-#ifdef GLUTEN_ENABLE_IAA
- case gluten::kQplGzip: {
- ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD,
CodecBackend::IAA);
- break;
- }
-#endif
- default:
- throw GlutenException("Codec not supported. Only support LZ4 or
QATGzip");
- }
- ipcWriteOptions.memory_pool = arrow::default_memory_pool();
-
- int64_t elapseRead = 0;
- int64_t numBatches = 0;
- int64_t numRows = 0;
- int64_t compressTime = 0;
- int64_t decompressTime = 0;
- int64_t uncompressedSize = 0;
- int64_t compressedSize = 0;
-
- auto startTime = std::chrono::steady_clock::now();
-
- doCompress(
- elapseRead,
- numBatches,
- numRows,
- compressTime,
- decompressTime,
- uncompressedSize,
- compressedSize,
- ipcWriteOptions,
- state);
- auto endTime = std::chrono::steady_clock::now();
- auto totalTime = (endTime - startTime).count();
- LOG(INFO) << "Thread " << state.thread_index() << " took " << (1.0 *
totalTime / 1e9) << "s";
-
- state.counters["rowgroups"] =
- benchmark::Counter(rowGroupIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-
- state.counters["columns"] =
- benchmark::Counter(columnIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
-
- state.counters["batches"] =
- benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["num_rows"] =
- benchmark::Counter(numRows, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["batch_buffer_size"] =
- benchmark::Counter(compressBufferSize,
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024);
-
- state.counters["parquet_parse"] =
- benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["compress_time"] =
- benchmark::Counter(compressTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["decompress_time"] =
- benchmark::Counter(decompressTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["total_time"] =
- benchmark::Counter(totalTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- state.counters["uncompressed_size"] =
- benchmark::Counter(uncompressedSize, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
-
- state.counters["compressed_size"] =
- benchmark::Counter(compressedSize, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
-
- auto compressionRatio = 1.0 * compressedSize / uncompressedSize;
- state.counters["compression_ratio"] =
- benchmark::Counter(compressionRatio, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
-
- // compress_time is in nanosecond, zoom out to second.
- auto throughput = 1.0 * uncompressedSize / compressTime * 1e9 * 8;
- state.counters["throughput_total"] =
- benchmark::Counter(throughput, benchmark::Counter::kDefaults,
benchmark::Counter::OneK::kIs1024);
-
- state.counters["throughput_per_thread"] =
- benchmark::Counter(throughput, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
- }
-
- protected:
- virtual void doCompress(
- int64_t& elapseRead,
- int64_t& numBatches,
- int64_t& numRows,
- int64_t& compressTime,
- int64_t& decompressTime,
- int64_t& uncompressedSize,
- int64_t& compressedSize,
- arrow::ipc::IpcWriteOptions& ipcWriteOptions,
- benchmark::State& state) {}
-
- void decompress(
- const arrow::ipc::IpcWriteOptions& ipcWriteOptions,
- const std::vector<std::shared_ptr<arrow::ipc::IpcPayload>>& payloads,
- const std::vector<std::vector<int64_t>>& uncompressedBufferSize,
- int64_t& decompressTime) {
- TIME_NANO_START(decompressTime);
- auto codec = ipcWriteOptions.codec;
- for (auto i = 0; i < payloads.size(); ++i) {
- auto& buffers = payloads[i]->body_buffers;
- for (auto j = 0; j < buffers.size(); ++j) {
- auto outputSize = uncompressedBufferSize[i][j];
- if (buffers[j] && outputSize >= 0) {
- GLUTEN_ASSIGN_OR_THROW(auto out,
arrow::AllocateResizableBuffer(outputSize, ipcWriteOptions.memory_pool));
- // Exclude the first 8-byte buffer metadata.
- GLUTEN_ASSIGN_OR_THROW(
- auto len,
- codec->Decompress(buffers[j]->size() - 8, buffers[j]->data() +
8, outputSize, out->mutable_data()));
- static_cast<void>(len);
- }
- }
- }
- TIME_NANO_END(decompressTime);
- }
-
- protected:
- std::shared_ptr<arrow::io::RandomAccessFile> file_;
- std::vector<int> rowGroupIndices_;
- std::vector<int> columnIndices_;
- std::shared_ptr<arrow::Schema> schema_;
- parquet::ArrowReaderProperties properties_;
-};
-
-class BenchmarkCompressionCacheScanBenchmark final : public
BenchmarkCompression {
- public:
- explicit BenchmarkCompressionCacheScanBenchmark(const std::string& filename,
uint32_t compressBufferSize)
- : BenchmarkCompression(filename, compressBufferSize) {}
-
- std::string name() const override {
- return "CacheScan";
- }
-
- protected:
- void doCompress(
- int64_t& elapseRead,
- int64_t& numBatches,
- int64_t& numRows,
- int64_t& compressTime,
- int64_t& decompressTime,
- int64_t& uncompressedSize,
- int64_t& compressedSize,
- arrow::ipc::IpcWriteOptions& ipcWriteOptions,
- benchmark::State& state) override {
- std::shared_ptr<arrow::RecordBatch> recordBatch;
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
columnIndices_, &recordBatchReader));
- do {
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
-
- if (recordBatch) {
- batches.push_back(recordBatch);
- numBatches += 1;
- numRows += recordBatch->num_rows();
- }
- } while (recordBatch);
-
- LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1e6 << "
ms ";
- LOG(INFO) << "batches = " << numBatches << " rows = " << numRows;
-
- std::vector<std::shared_ptr<arrow::ipc::IpcPayload>>
payloads(batches.size());
- std::vector<std::vector<int64_t>> uncompressedBufferSize(batches.size());
-
- for (auto _ : state) {
- auto it = batches.begin();
- auto pos = 0;
- while (it != batches.end()) {
- recordBatch = *it++;
- for (auto i = 0; i < recordBatch->num_columns(); ++i) {
- recordBatch->column(i)->data()->buffers[0] = nullptr;
- for (auto& buffer : recordBatch->column(i)->data()->buffers) {
- if (buffer) {
- uncompressedBufferSize[pos].push_back(buffer->size());
- } else {
- uncompressedBufferSize[pos].push_back(-1);
- }
- }
- }
- auto payload = std::make_shared<arrow::ipc::IpcPayload>();
-
- TIME_NANO_OR_THROW(
- compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch,
ipcWriteOptions, payload.get()));
- uncompressedSize += payload->raw_body_length;
- compressedSize += payload->body_length;
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
- payloads[pos] = std::move(payload);
- pos++;
- }
-
- decompress(ipcWriteOptions, payloads, uncompressedBufferSize,
decompressTime);
- }
- }
-};
-
-class BenchmarkCompressionIterateScanBenchmark final : public
BenchmarkCompression {
- public:
- explicit BenchmarkCompressionIterateScanBenchmark(const std::string&
filename, uint32_t compressBufferSize)
- : BenchmarkCompression(filename, compressBufferSize) {}
-
- std::string name() const override {
- return "IterateScan";
- }
-
- protected:
- void doCompress(
- int64_t& elapseRead,
- int64_t& numBatches,
- int64_t& numRows,
- int64_t& compressTime,
- int64_t& decompressTime,
- int64_t& uncompressedSize,
- int64_t& compressedSize,
- arrow::ipc::IpcWriteOptions& ipcWriteOptions,
- benchmark::State& state) override {
- std::shared_ptr<arrow::RecordBatch> recordBatch;
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- for (auto _ : state) {
-
GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
columnIndices_, &recordBatchReader));
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
- std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads;
- std::vector<std::vector<int64_t>> uncompressedBufferSize;
- while (recordBatch) {
- numBatches += 1;
- uncompressedBufferSize.resize(numBatches);
-
- numRows += recordBatch->num_rows();
- for (auto i = 0; i < recordBatch->num_columns(); ++i) {
- recordBatch->column(i)->data()->buffers[0] = nullptr;
- for (auto& buffer : recordBatch->column(i)->data()->buffers) {
- if (buffer) {
- uncompressedBufferSize.back().push_back(buffer->size());
- } else {
- uncompressedBufferSize.back().push_back(-1);
- }
- }
- }
- auto payload = std::make_shared<arrow::ipc::IpcPayload>();
-
- TIME_NANO_OR_THROW(
- compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch,
ipcWriteOptions, payload.get()));
- uncompressedSize += payload->raw_body_length;
- compressedSize += payload->body_length;
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
- payloads.push_back(std::move(payload));
- }
-
- decompress(ipcWriteOptions, payloads, uncompressedBufferSize,
decompressTime);
- }
- }
-};
-
-} // namespace gluten
-
-int main(int argc, char** argv) {
- uint32_t iterations = 1;
- uint32_t threads = 1;
- uint32_t cpuOffset = 0;
- std::string datafile;
- auto codec = gluten::kLZ4;
- uint32_t compressBufferSize = 4096;
-
- for (int i = 0; i < argc; i++) {
- if (strcmp(argv[i], "--iterations") == 0) {
- iterations = atol(argv[i + 1]);
- } else if (strcmp(argv[i], "--threads") == 0) {
- threads = atol(argv[i + 1]);
- } else if (strcmp(argv[i], "--file") == 0) {
- datafile = argv[i + 1];
- } else if (strcmp(argv[i], "--qat-gzip") == 0) {
- LOG(INFO) << "QAT gzip is used as codec";
- codec = gluten::kQatGzip;
- } else if (strcmp(argv[i], "--qat-zstd") == 0) {
- LOG(INFO) << "QAT zstd is used as codec";
- codec = gluten::kQatZstd;
- } else if (strcmp(argv[i], "--qpl-gzip") == 0) {
- LOG(INFO) << "QPL gzip is used as codec";
- codec = gluten::kQplGzip;
- } else if (strcmp(argv[i], "--zstd") == 0) {
- LOG(INFO) << "CPU zstd is used as codec";
- codec = gluten::kZstd;
- } else if (strcmp(argv[i], "--buffer-size") == 0) {
- compressBufferSize = atol(argv[i + 1]);
- } else if (strcmp(argv[i], "--cpu-offset") == 0) {
- cpuOffset = atol(argv[i + 1]);
- }
- }
- LOG(INFO) << "iterations = " << iterations;
- LOG(INFO) << "threads = " << threads;
- LOG(INFO) << "datafile = " << datafile;
-
- gluten::BenchmarkCompressionIterateScanBenchmark bmIterateScan(datafile,
compressBufferSize);
- gluten::BenchmarkCompressionCacheScanBenchmark bmCacheScan(datafile,
compressBufferSize);
-
- benchmark::RegisterBenchmark(bmIterateScan.name().c_str(), bmIterateScan)
- ->Iterations(iterations)
- ->Args({
- codec,
- compressBufferSize,
- cpuOffset,
- })
- ->Threads(threads)
- ->ReportAggregatesOnly(false)
- ->MeasureProcessCPUTime()
- ->Unit(benchmark::kSecond);
-
- benchmark::RegisterBenchmark(bmCacheScan.name().c_str(), bmCacheScan)
- ->Iterations(iterations)
- ->Args({
- codec,
- compressBufferSize,
- cpuOffset,
- })
- ->Threads(threads)
- ->ReportAggregatesOnly(false)
- ->MeasureProcessCPUTime()
- ->Unit(benchmark::kSecond);
-
- benchmark::Initialize(&argc, argv);
- benchmark::RunSpecifiedBenchmarks();
- benchmark::Shutdown();
-}
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index 81be604eaf..8edd515f08 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -15,104 +15,25 @@
* limitations under the License.
*/
-#include <arrow/c/abi.h>
-#include <arrow/c/bridge.h>
-#include <arrow/filesystem/filesystem.h>
-#include <arrow/io/file.h>
-#include <arrow/io/interfaces.h>
-#include <arrow/memory_pool.h>
-#include <arrow/record_batch.h>
-#include <arrow/table.h>
-#include <arrow/type.h>
-#include <arrow/util/io_util.h>
#include <benchmark/benchmark.h>
-#include <gtest/gtest.h>
-#include <parquet/arrow/reader.h>
-#include <parquet/file_reader.h>
-#include <parquet/properties.h>
-
-#include <chrono>
#include "benchmarks/common/BenchmarkUtils.h"
+#include "compute/Runtime.h"
#include "compute/VeloxBackend.h"
-#include "compute/VeloxRuntime.h"
-#include "memory/ArrowMemoryPool.h"
-#include "memory/ColumnarBatch.h"
-#include "utils/Macros.h"
-#include "utils/TestUtils.h"
+#include "memory/VeloxMemoryManager.h"
+#include "operators/reader/ParquetReaderIterator.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/VeloxArrowUtils.h"
-#include "velox/dwio/parquet/writer/Writer.h"
-#include "velox/vector/arrow/Bridge.h"
-using namespace facebook;
-using namespace arrow;
namespace gluten {
const int kBatchBufferSize = 32768;
-class GoogleBenchmarkParquetWrite {
+class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark {
public:
- GoogleBenchmarkParquetWrite(std::string fileName, std::string outputPath)
- : fileName_(fileName), outputPath_(outputPath) {
- getRecordBatchReader(fileName);
- }
-
- void getRecordBatchReader(const std::string& inputFile) {
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
-
- std::shared_ptr<arrow::fs::FileSystem> fs;
- std::string fileName;
- ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile,
&fileName))
-
- ARROW_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName));
-
- properties_.set_batch_size(kBatchBufferSize);
- properties_.set_pre_buffer(false);
- properties_.set_use_threads(false);
-
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- ASSERT_NOT_OK(parquetReader->GetSchema(&schema_));
-
- auto numRowgroups = parquetReader->num_row_groups();
-
- for (int i = 0; i < numRowgroups; ++i) {
- rowGroupIndices_.push_back(i);
- }
+ GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(const std::string&
fileName, const std::string& outputPath)
+ : fileName_(fileName), outputPath_(outputPath) {}
- auto numColumns = schema_->num_fields();
- for (int i = 0; i < numColumns; ++i) {
- columnIndices_.push_back(i);
- }
- }
-
- virtual void operator()(benchmark::State& state) {}
-
- protected:
- std::shared_ptr<ColumnarBatch> recordBatch2VeloxColumnarBatch(const
arrow::RecordBatch& rb) {
- ArrowArray arrowArray;
- ArrowSchema arrowSchema;
- ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
- auto vp = velox::importFromArrowAsOwner(arrowSchema, arrowArray,
gluten::defaultLeafVeloxMemoryPool().get());
- return
std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
- }
-
- protected:
- std::string fileName_;
- std::string outputPath_;
- std::shared_ptr<arrow::io::RandomAccessFile> file_;
- std::vector<int> rowGroupIndices_;
- std::vector<int> columnIndices_;
- std::shared_ptr<arrow::Schema> schema_;
- ::parquet::ArrowReaderProperties properties_;
-};
-
-class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark : public
GoogleBenchmarkParquetWrite {
- public:
- GoogleBenchmarkArrowParquetWriteCacheScanBenchmark(std::string fileName,
std::string outputPath)
- : GoogleBenchmarkParquetWrite(fileName, outputPath) {}
void operator()(benchmark::State& state) {
if (state.range(0) == 0xffffffff) {
setCpu(state.thread_index());
@@ -124,158 +45,45 @@ class GoogleBenchmarkArrowParquetWriteCacheScanBenchmark
: public GoogleBenchmar
int64_t elapseRead = 0;
int64_t numBatches = 0;
int64_t numRows = 0;
+ int64_t numColumns = 0;
int64_t initTime = 0;
int64_t writeTime = 0;
- std::vector<int> localColumnIndices = columnIndices_;
-
- std::shared_ptr<arrow::Schema> localSchema;
- localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
- if (state.thread_index() == 0)
- LOG(INFO) << localSchema->ToString();
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- ::arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- std::vector<std::shared_ptr<RecordBatch>> vectors;
- ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
localColumnIndices, &recordBatchReader));
- do {
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
-
- if (recordBatch) {
- vectors.push_back(recordBatch);
- numBatches += 1;
- numRows += recordBatch->num_rows();
- }
- } while (recordBatch);
-
- LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000
<< " rows = " << numRows;
-
// reuse the ParquetWriteConverter for batches caused system % increase a
lot
- auto fileName = "arrow_parquet_write.parquet";
-
- for (auto _ : state) {
- // Choose compression
- std::shared_ptr<::parquet::WriterProperties> props =
-
::parquet::WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
-
- // Opt to store Arrow schema for easier reads back into Arrow
- std::shared_ptr<::parquet::ArrowWriterProperties> arrow_props =
- ::parquet::ArrowWriterProperties::Builder().store_schema()->build();
-
- std::shared_ptr<arrow::io::FileOutputStream> outfile;
- outfile = arrow::io::FileOutputStream::Open(outputPath_ +
fileName).ValueOrDie();
- std::unique_ptr<::parquet::arrow::FileWriter> arrowWriter;
-
- arrowWriter =
- ::parquet::arrow::FileWriter::Open(*localSchema,
arrow::default_memory_pool(), outfile, props, arrow_props)
- .ValueOrDie();
- auto start = std::chrono::steady_clock::now();
- for (const auto& vector : vectors) {
- auto table = arrow::Table::Make(vector->schema(), vector->columns(),
vector->num_rows());
- PARQUET_THROW_NOT_OK(arrowWriter->WriteTable(*table, 10000));
- }
- auto end = std::chrono::steady_clock::now();
- writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start).count();
- }
-
- state.counters["rowgroups"] =
- benchmark::Counter(rowGroupIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["columns"] =
- benchmark::Counter(columnIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
- state.counters["batches"] =
- benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["num_rows"] =
- benchmark::Counter(numRows, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["batch_buffer_size"] =
- benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1024);
-
- state.counters["parquet_parse"] =
- benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["init_time"] =
- benchmark::Counter(initTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- state.counters["write_time"] =
- benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
- }
-};
-
-class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public
GoogleBenchmarkParquetWrite {
- public:
- GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark(std::string fileName,
std::string outputPath)
- : GoogleBenchmarkParquetWrite(fileName, outputPath) {}
- void operator()(benchmark::State& state) {
- if (state.range(0) == 0xffffffff) {
- setCpu(state.thread_index());
- } else {
- setCpu(state.range(0));
- }
-
- std::shared_ptr<arrow::RecordBatch> recordBatch;
- int64_t elapseRead = 0;
- int64_t numBatches = 0;
- int64_t numRows = 0;
- int64_t initTime = 0;
- int64_t writeTime = 0;
-
- std::vector<int> localColumnIndices = columnIndices_;
-
- std::shared_ptr<arrow::Schema> localSchema;
- localSchema = std::make_shared<arrow::Schema>(*schema_.get());
-
- if (state.thread_index() == 0)
- LOG(INFO) << localSchema->ToString();
-
- std::unique_ptr<::parquet::arrow::FileReader> parquetReader;
- std::shared_ptr<RecordBatchReader> recordBatchReader;
- ASSERT_NOT_OK(::parquet::arrow::FileReader::Make(
- ::arrow::default_memory_pool(),
::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader));
-
- std::vector<std::shared_ptr<gluten::ColumnarBatch>> vectors;
- ASSERT_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_,
localColumnIndices, &recordBatchReader));
- do {
- TIME_NANO_OR_THROW(elapseRead,
recordBatchReader->ReadNext(&recordBatch));
-
- if (recordBatch) {
- vectors.push_back(recordBatch2VeloxColumnarBatch(*recordBatch));
- numBatches += 1;
- numRows += recordBatch->num_rows();
- }
- } while (recordBatch);
-
- LOG(INFO) << " parquet parse done elapsed time = " << elapseRead / 1000000
<< " rows = " << numRows;
-
- // reuse the ParquetWriteConverter for batches caused system % increase a
lot
- auto fileName = "velox_parquet_write.parquet";
auto memoryManager = getDefaultMemoryManager();
auto runtime = Runtime::create(kVeloxBackendKind, memoryManager);
auto veloxPool = memoryManager->getAggregateMemoryPool();
for (auto _ : state) {
+ const auto output = "velox_parquet_write.parquet";
+
// Init VeloxParquetDataSource
+ auto reader = [&] {
+ ScopedTimer timer(&elapseRead);
+ return std::make_unique<ParquetBufferedReaderIterator>(fileName_,
kBatchBufferSize, veloxPool.get());
+ }();
+
+ const auto localSchema = toArrowSchema(reader->getRowType(),
veloxPool.get());
+
auto veloxParquetDataSource =
std::make_unique<gluten::VeloxParquetDataSource>(
- outputPath_ + "/" + fileName,
+ outputPath_ + "/" + output,
veloxPool->addAggregateChild("writer_benchmark"),
veloxPool->addLeafChild("sink_pool"),
localSchema);
veloxParquetDataSource->init(runtime->getConfMap());
- auto start = std::chrono::steady_clock::now();
- for (const auto& vector : vectors) {
- veloxParquetDataSource->write(vector);
+
+ while (auto batch = reader->next()) {
+ ScopedTimer timer(&elapseRead);
+ veloxParquetDataSource->write(batch);
}
- auto end = std::chrono::steady_clock::now();
- writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start).count();
+
veloxParquetDataSource->close();
}
- state.counters["rowgroups"] =
- benchmark::Counter(rowGroupIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["columns"] =
- benchmark::Counter(columnIndices_.size(),
benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
+ benchmark::Counter(numColumns, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
state.counters["batches"] =
benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
state.counters["num_rows"] =
@@ -291,6 +99,10 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark :
public GoogleBenchmar
benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads,
benchmark::Counter::OneK::kIs1000);
Runtime::release(runtime);
}
+
+ private:
+ std::string fileName_;
+ std::string outputPath_;
};
} // namespace gluten
diff --git a/cpp/velox/operators/reader/FileReaderIterator.cc
b/cpp/velox/operators/reader/FileReaderIterator.cc
index 49d52f5ffd..7b2a04e360 100644
--- a/cpp/velox/operators/reader/FileReaderIterator.cc
+++ b/cpp/velox/operators/reader/FileReaderIterator.cc
@@ -24,12 +24,6 @@ namespace {
const std::string kParquetSuffix = ".parquet";
}
-FileReaderIterator::FileReaderIterator(const std::string& path) : path_(path)
{}
-
-int64_t FileReaderIterator::getCollectBatchTime() const {
- return collectBatchTime_;
-}
-
std::shared_ptr<gluten::ResultIterator>
FileReaderIterator::getInputIteratorFromFileReader(
FileReaderType readerType,
const std::string& path,
diff --git a/cpp/velox/operators/reader/FileReaderIterator.h
b/cpp/velox/operators/reader/FileReaderIterator.h
index 708120603e..ad22148e55 100644
--- a/cpp/velox/operators/reader/FileReaderIterator.h
+++ b/cpp/velox/operators/reader/FileReaderIterator.h
@@ -33,13 +33,13 @@ class FileReaderIterator : public ColumnarBatchIterator {
int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
- explicit FileReaderIterator(const std::string& path);
+ explicit FileReaderIterator(const std::string& path) : path_(path){};
virtual ~FileReaderIterator() = default;
- virtual std::shared_ptr<arrow::Schema> getSchema() = 0;
-
- int64_t getCollectBatchTime() const;
+ int64_t getCollectBatchTime() const {
+ return collectBatchTime_;
+ }
protected:
int64_t collectBatchTime_ = 0;
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.cc
b/cpp/velox/operators/reader/ParquetReaderIterator.cc
index 014bccbfce..223d1c0029 100644
--- a/cpp/velox/operators/reader/ParquetReaderIterator.cc
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.cc
@@ -17,31 +17,44 @@
#include "operators/reader/ParquetReaderIterator.h"
#include "memory/VeloxColumnarBatch.h"
-
-#include <arrow/util/range.h>
+#include "utils/Timer.h"
namespace gluten {
+namespace {
+std::unique_ptr<facebook::velox::parquet::ParquetReader> createReader(
+ const std::string& path,
+ const facebook::velox::dwio::common::ReaderOptions& opts) {
+ auto input = std::make_unique<facebook::velox::dwio::common::BufferedInput>(
+ std::make_shared<facebook::velox::LocalReadFile>(path),
opts.memoryPool());
+ return
std::make_unique<facebook::velox::parquet::ParquetReader>(std::move(input),
opts);
+}
+
+std::shared_ptr<facebook::velox::common::ScanSpec> makeScanSpec(const
facebook::velox::RowTypePtr& rowType) {
+ auto scanSpec = std::make_shared<facebook::velox::common::ScanSpec>("");
+ scanSpec->addAllChildFields(*rowType);
+ return scanSpec;
+}
+} // namespace
ParquetReaderIterator::ParquetReaderIterator(
const std::string& path,
int64_t batchSize,
facebook::velox::memory::MemoryPool* pool)
- : FileReaderIterator(path), batchSize_(batchSize), pool_(pool) {}
-
-void ParquetReaderIterator::createReader() {
- parquet::ArrowReaderProperties properties =
parquet::default_arrow_reader_properties();
- properties.set_batch_size(batchSize_);
- GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
- GLUTEN_THROW_NOT_OK(
-
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
&recordBatchReader_));
-
- auto schema = recordBatchReader_->schema();
- DLOG(INFO) << "Schema:\n" << schema->ToString();
-}
+ : FileReaderIterator(path), pool_(pool), batchSize_(batchSize) {}
+
+void ParquetReaderIterator::createRowReader() {
+ facebook::velox::dwio::common::ReaderOptions readerOptions{pool_};
+ auto reader = createReader(path_, readerOptions);
+
+ rowType_ = reader->rowType();
-std::shared_ptr<arrow::Schema> ParquetReaderIterator::getSchema() {
- return recordBatchReader_->schema();
+ facebook::velox::dwio::common::RowReaderOptions rowReaderOpts;
+
rowReaderOpts.select(std::make_shared<facebook::velox::dwio::common::ColumnSelector>(rowType_,
rowType_->names()));
+ rowReaderOpts.setScanSpec(makeScanSpec(rowType_));
+
+ rowReader_ = reader->createRowReader(rowReaderOpts);
+
+ DLOG(INFO) << "Opened file for read: " << path_;
}
ParquetStreamReaderIterator::ParquetStreamReaderIterator(
@@ -49,20 +62,30 @@ ParquetStreamReaderIterator::ParquetStreamReaderIterator(
int64_t batchSize,
facebook::velox::memory::MemoryPool* pool)
: ParquetReaderIterator(path, batchSize, pool) {
- createReader();
- DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
+ createRowReader();
}
std::shared_ptr<gluten::ColumnarBatch> ParquetStreamReaderIterator::next() {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
- DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " <<
(batch ? batch->num_rows() : 0);
- collectBatchTime_ +=
-
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
- startTime).count();
- if (batch == nullptr) {
+ ScopedTimer timer(&collectBatchTime_);
+
+ static constexpr int32_t kBatchSize = 4096;
+
+ auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize,
pool_);
+ auto numRows = rowReader_->next(kBatchSize, result);
+
+ if (numRows == 0) {
return nullptr;
}
- return VeloxColumnarBatch::from(pool_,
std::make_shared<gluten::ArrowColumnarBatch>(batch));
+
+ // Load lazy vector.
+ result = facebook::velox::BaseVector::loadedVectorShared(result);
+
+ auto rowVector =
std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
+ GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
+
+ DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
+
+ return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
@@ -70,29 +93,44 @@
ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
int64_t batchSize,
facebook::velox::memory::MemoryPool* pool)
: ParquetReaderIterator(path, batchSize, pool) {
- createReader();
+ createRowReader();
collectBatches();
- iter_ = batches_.begin();
- DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
- DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
- if (iter_ != batches_.cend()) {
- DLOG(INFO) << "columns: " << (*iter_)->num_columns();
- DLOG(INFO) << "rows: " << (*iter_)->num_rows();
- }
}
std::shared_ptr<gluten::ColumnarBatch> ParquetBufferedReaderIterator::next() {
if (iter_ == batches_.cend()) {
return nullptr;
}
- return VeloxColumnarBatch::from(pool_,
std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
+ return *iter_++;
}
void ParquetBufferedReaderIterator::collectBatches() {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
- auto endTime = std::chrono::steady_clock::now();
- collectBatchTime_ +=
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime -
startTime).count();
+ ScopedTimer timer(&collectBatchTime_);
+
+ static constexpr int32_t kBatchSize = 4096;
+
+ uint64_t numRows = 0;
+ while (true) {
+ auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize,
pool_);
+ numRows = rowReader_->next(kBatchSize, result);
+ if (numRows == 0) {
+ break;
+ }
+
+ // Load lazy vector.
+ result = facebook::velox::BaseVector::loadedVectorShared(result);
+
+ auto rowVector =
std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
+ GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
+
+ DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
+
+
batches_.push_back(std::make_shared<VeloxColumnarBatch>(std::move(rowVector)));
+ }
+
+ iter_ = batches_.begin();
+
+ DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
}
} // namespace gluten
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.h
b/cpp/velox/operators/reader/ParquetReaderIterator.h
index f45fe5eb77..85449980e6 100644
--- a/cpp/velox/operators/reader/ParquetReaderIterator.h
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.h
@@ -17,9 +17,11 @@
#pragma once
+#include "memory/VeloxColumnarBatch.h"
#include "operators/reader/FileReaderIterator.h"
-#include <parquet/arrow/reader.h>
+#include "velox/dwio/parquet/reader/ParquetReader.h"
+
#include <memory>
namespace gluten {
@@ -28,22 +30,26 @@ class ParquetReaderIterator : public FileReaderIterator {
public:
explicit ParquetReaderIterator(const std::string& path, int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
- void createReader();
-
- std::shared_ptr<arrow::Schema> getSchema() override;
+ facebook::velox::RowTypePtr getRowType() const {
+ return rowType_;
+ }
protected:
- std::unique_ptr<::parquet::arrow::FileReader> fileReader_;
- std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
- int64_t batchSize_;
+ void createRowReader();
+
facebook::velox::memory::MemoryPool* pool_;
+
+ facebook::velox::RowTypePtr rowType_;
+ std::unique_ptr<facebook::velox::dwio::common::RowReader> rowReader_;
+
+ int64_t batchSize_;
};
class ParquetStreamReaderIterator final : public ParquetReaderIterator {
public:
ParquetStreamReaderIterator(const std::string& path, int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
- std::shared_ptr<gluten::ColumnarBatch> next() override;
+ std::shared_ptr<ColumnarBatch> next() override;
};
class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
@@ -53,13 +59,13 @@ class ParquetBufferedReaderIterator final : public
ParquetReaderIterator {
int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
- std::shared_ptr<gluten::ColumnarBatch> next() override;
+ std::shared_ptr<ColumnarBatch> next() override;
private:
void collectBatches();
- arrow::RecordBatchVector batches_;
- std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
+ std::vector<std::shared_ptr<VeloxColumnarBatch>> batches_;
+ std::vector<std::shared_ptr<VeloxColumnarBatch>>::const_iterator iter_;
};
} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]