This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 58e7d831e7 [GLUTEN-7953][VL] Fetch and dump all inputs for micro 
benchmark on middle stage begin (#7998)
58e7d831e7 is described below

commit 58e7d831e7a18d707576a983f2b49d91b0b3d67f
Author: Rong Ma <[email protected]>
AuthorDate: Mon Nov 25 09:19:52 2024 +0800

    [GLUTEN-7953][VL] Fetch and dump all inputs for micro benchmark on middle 
stage begin (#7998)
---
 .github/workflows/velox_backend.yml                |   8 +-
 .../benchmarks/NativeBenchmarkPlanGenerator.scala  |  98 +++++++------------
 cpp/CMakeLists.txt                                 |   4 -
 cpp/core/compute/Runtime.h                         |   3 +
 cpp/core/jni/JniCommon.cc                          |  44 ++++++---
 cpp/core/jni/JniCommon.h                           |   1 +
 cpp/core/jni/JniWrapper.cc                         |  14 ++-
 cpp/core/memory/ColumnarBatch.h                    |   1 -
 cpp/core/operators/writer/ArrowWriter.cc           |   9 +-
 cpp/core/operators/writer/ArrowWriter.h            |  16 ++-
 cpp/velox/CMakeLists.txt                           |   3 +
 cpp/velox/benchmarks/CMakeLists.txt                |   7 +-
 cpp/velox/benchmarks/GenericBenchmark.cc           |  34 +++----
 cpp/velox/benchmarks/common/OrcReaderIterator.h    | 107 ---------------------
 .../benchmarks/common/ParquetReaderIterator.h      | 104 --------------------
 cpp/velox/benchmarks/exec/OrcConverter.cc          |   2 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  14 ++-
 cpp/velox/compute/VeloxRuntime.h                   |   2 +
 .../reader}/FileReaderIterator.cc                  |  41 ++++----
 .../reader}/FileReaderIterator.h                   |  28 ++----
 .../operators/reader/ParquetReaderIterator.cc      |  97 +++++++++++++++++++
 cpp/velox/operators/reader/ParquetReaderIterator.h |  65 +++++++++++++
 .../serializer/VeloxColumnarBatchSerializer.cc     |   3 +-
 .../operators/writer/VeloxArrowWriter.cc}          |  37 ++++---
 .../operators/writer/VeloxArrowWriter.h}           |  24 ++---
 cpp/velox/tests/RuntimeTest.cc                     |   4 +
 .../tests/VeloxColumnarBatchSerializerTest.cc      |   3 +-
 cpp/velox/utils/VeloxArrowUtils.cc                 |   3 +
 docs/developers/MicroBenchmarks.md                 |  14 +--
 .../scala/org/apache/gluten/utils/DebugUtil.scala  |  38 +++++---
 .../scala/org/apache/gluten/GlutenConfig.scala     |  21 ++--
 31 files changed, 411 insertions(+), 438 deletions(-)

diff --git a/.github/workflows/velox_backend.yml 
b/.github/workflows/velox_backend.yml
index 42e1f25278..d7445d1a27 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -969,9 +969,11 @@ jobs:
         run: |
           $MVN_CMD test -Pspark-3.5 -Pbackends-velox -pl backends-velox -am \
           -DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none 
-DfailIfNoTests=false -Dexec.skip
-          # This test depends on example.json generated by the above mvn test.
-          cd cpp/build/velox/benchmarks && sudo chmod +x ./generic_benchmark
-          ./generic_benchmark --run-example --with-shuffle --threads 1 
--iterations 1
+          # This test depends on files generated by the above mvn test.
+          ./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle 
--partitioning hash --threads 1 --iterations 1 \
+          --conf $(realpath 
backends-velox/generated-native-benchmark/conf_12_0.ini) \
+          --plan $(realpath 
backends-velox/generated-native-benchmark/plan_12_0.json) \
+          --data $(realpath 
backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath 
backends-velox/generated-native-benchmark/data_12_0_1.parquet)
       - name: Run UDF test
         run: |
           # Depends on --build_example=ON.
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
index 57fbca1744..1e378d16f1 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
@@ -19,18 +19,14 @@ package org.apache.gluten.benchmarks
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.{VeloxWholeStageTransformerSuite, 
WholeStageTransformer}
 
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
ShuffleQueryStageExec}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
 
 import org.apache.commons.io.FileUtils
 import org.scalatest.Tag
 
 import java.io.File
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
-
-import scala.collection.JavaConverters._
 
 object GenerateExample extends Tag("org.apache.gluten.tags.GenerateExample")
 
@@ -50,6 +46,11 @@ class NativeBenchmarkPlanGenerator extends 
VeloxWholeStageTransformerSuite {
     createTPCHNotNullTables()
   }
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
   test("Test plan json non-empty - AQE off") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
@@ -67,7 +68,6 @@ class NativeBenchmarkPlanGenerator extends 
VeloxWholeStageTransformerSuite {
       planJson = 
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
       assert(planJson.nonEmpty)
     }
-    spark.sparkContext.setLogLevel(logLevel)
   }
 
   test("Test plan json non-empty - AQE on") {
@@ -88,70 +88,42 @@ class NativeBenchmarkPlanGenerator extends 
VeloxWholeStageTransformerSuite {
       val planJson = 
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
       assert(planJson.nonEmpty)
     }
-    spark.sparkContext.setLogLevel(logLevel)
   }
 
   test("generate example", GenerateExample) {
-    import testImplicits._
     withSQLConf(
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
       SQLConf.SHUFFLE_PARTITIONS.key -> "2",
-      GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true"
+      GlutenConfig.BENCHMARK_SAVE_DIR.key -> generatedPlanDir,
+      GlutenConfig.BENCHMARK_TASK_STAGEID.key -> "12",
+      GlutenConfig.BENCHMARK_TASK_PARTITIONID.key -> "0"
     ) {
       logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir")
-      val q4_lineitem = spark
-        .sql(s"""
-                |select l_orderkey from lineitem where l_commitdate < 
l_receiptdate
-                |""".stripMargin)
-      val q4_orders = spark
-        .sql(s"""
-                |select o_orderkey, o_orderpriority
-                |  from orders
-                |  where o_orderdate >= '1993-07-01' and o_orderdate < 
'1993-10-01'
-                |""".stripMargin)
-      q4_lineitem
-        .createOrReplaceTempView("q4_lineitem")
-      q4_orders
-        .createOrReplaceTempView("q4_orders")
-
-      q4_lineitem
-        .repartition(1, 'l_orderkey)
-        .write
-        .format(outputFileFormat)
-        .save(generatedPlanDir + "/example_lineitem")
-      q4_orders
-        .repartition(1, 'o_orderkey)
-        .write
-        .format(outputFileFormat)
-        .save(generatedPlanDir + "/example_orders")
-
-      val df =
-        spark.sql("""
-                    |select * from q4_orders left semi join q4_lineitem on 
l_orderkey = o_orderkey
-                    |""".stripMargin)
-      generateSubstraitJson(df, "example.json")
+      spark
+        .sql("""
+               |select /*+ REPARTITION(1) */
+               |  o_orderpriority,
+               |  count(*) as order_count
+               |from
+               |  orders
+               |where
+               |  o_orderdate >= date '1993-07-01'
+               |  and o_orderdate < date '1993-07-01' + interval '3' month
+               |  and exists (
+               |    select /*+ REPARTITION(1) */
+               |      *
+               |    from
+               |      lineitem
+               |    where
+               |      l_orderkey = o_orderkey
+               |      and l_commitdate < l_receiptdate
+               |  )
+               |group by
+               |  o_orderpriority
+               |order by
+               |  o_orderpriority
+               |""".stripMargin)
+        .foreach(_ => ())
     }
-    spark.sparkContext.setLogLevel(logLevel)
-  }
-
-  def generateSubstraitJson(df: DataFrame, file: String): Unit = {
-    val executedPlan = df.queryExecution.executedPlan
-    executedPlan.execute()
-    val finalPlan =
-      executedPlan match {
-        case aqe: AdaptiveSparkPlanExec =>
-          aqe.executedPlan match {
-            case s: ShuffleQueryStageExec => s.shuffle.child
-            case other => other
-          }
-        case plan => plan
-      }
-    val lastStageTransformer = 
finalPlan.find(_.isInstanceOf[WholeStageTransformer])
-    assert(lastStageTransformer.nonEmpty)
-    val plan =
-      
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')
-
-    val exampleJsonFile = Paths.get(generatedPlanDir, file)
-    Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
   }
 }
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 09d09481f2..67fb9ec721 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -217,10 +217,6 @@ if(ENABLE_IAA)
   add_definitions(-DGLUTEN_ENABLE_IAA)
 endif()
 
-if(ENABLE_ORC)
-  add_definitions(-DGLUTEN_ENABLE_ORC)
-endif()
-
 #
 # Subdirectories
 #
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index a58f770ff7..3090652b81 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -27,6 +27,7 @@
 #include "operators/c2r/ColumnarToRow.h"
 #include "operators/r2c/RowToColumnar.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
+#include "operators/writer/ArrowWriter.h"
 #include "shuffle/ShuffleReader.h"
 #include "shuffle/ShuffleWriter.h"
 #include "substrait/plan.pb.h"
@@ -124,6 +125,8 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
 
   virtual void dumpConf(const std::string& path) = 0;
 
+  virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& 
path) = 0;
+
   const std::unordered_map<std::string, std::string>& getConfMap() {
     return confMap_;
   }
diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc
index 46b53e8c3b..bb8554568b 100644
--- a/cpp/core/jni/JniCommon.cc
+++ b/cpp/core/jni/JniCommon.cc
@@ -104,22 +104,34 @@ 
gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
 std::shared_ptr<gluten::ColumnarBatch> 
gluten::JniColumnarBatchIterator::next() {
   JNIEnv* env = nullptr;
   attachCurrentThreadAsDaemonOrThrow(vm_, &env);
-  if (!env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext_)) {
-    checkException(env);
-    return nullptr; // stream ended
-  }
-
-  checkException(env);
-  jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext_);
-  checkException(env);
-  auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
   if (writer_ != nullptr) {
-    // save snapshot of the batch to file
-    std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
-    std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
-    auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), 
schema.get()));
-    GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
-    GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+    if (!writer_->closed()) {
+      // Dump all inputs.
+      while (env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext_)) {
+        checkException(env);
+        jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext_);
+        checkException(env);
+        auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
+
+        // Save the snapshot of the batch to file.
+        std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
+        std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
+        auto rb = 
gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
+        GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
+        GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+      }
+      checkException(env);
+      GLUTEN_THROW_NOT_OK(writer_->closeWriter());
+    }
+    return writer_->retrieveColumnarBatch();
+  } else {
+    if (!env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext_)) {
+      checkException(env);
+      return nullptr; // stream ended
+    }
+    checkException(env);
+    jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext_);
+    checkException(env);
+    return ObjectStore::retrieve<ColumnarBatch>(handle);
   }
-  return batch;
 }
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index aeab454f1a..8f40398a41 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -26,6 +26,7 @@
 #include "compute/Runtime.h"
 #include "config/GlutenConfig.h"
 #include "memory/AllocationListener.h"
+#include "operators/writer/ArrowWriter.h"
 #include "shuffle/rss/RssClient.h"
 #include "utils/Compression.h"
 #include "utils/Exception.h"
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 9da5589486..963440f6fc 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -373,8 +373,16 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     }
     saveDir = conf.at(kGlutenSaveDir);
     std::filesystem::path f{saveDir};
-    if (!std::filesystem::exists(f)) {
-      throw GlutenException("Save input path " + saveDir + " does not exists");
+    if (std::filesystem::exists(f)) {
+      if (!std::filesystem::is_directory(f)) {
+        throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " + 
saveDir);
+      }
+    } else {
+      std::error_code ec;
+      std::filesystem::create_directory(f, ec);
+      if (ec) {
+        throw GlutenException("Failed to create directory: " + saveDir + ", 
error message: " + ec.message());
+      }
     }
     ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
   }
@@ -407,7 +415,7 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
     std::shared_ptr<ArrowWriter> writer = nullptr;
     if (saveInput) {
       auto file = saveDir + "/data" + fileIdentifier + "_" + 
std::to_string(idx) + ".parquet";
-      writer = std::make_shared<ArrowWriter>(file);
+      writer = ctx->createArrowWriter(file);
     }
     jobject iter = env->GetObjectArrayElement(iterArr, idx);
     auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h
index e0bab25418..be487f871e 100644
--- a/cpp/core/memory/ColumnarBatch.h
+++ b/cpp/core/memory/ColumnarBatch.h
@@ -23,7 +23,6 @@
 #include "arrow/c/helpers.h"
 #include "arrow/record_batch.h"
 #include "memory/MemoryManager.h"
-#include "operators/writer/ArrowWriter.h"
 #include "utils/ArrowStatus.h"
 #include "utils/Exception.h"
 
diff --git a/cpp/core/operators/writer/ArrowWriter.cc 
b/cpp/core/operators/writer/ArrowWriter.cc
index 19bab6ddcb..46ec2fc9ba 100644
--- a/cpp/core/operators/writer/ArrowWriter.cc
+++ b/cpp/core/operators/writer/ArrowWriter.cc
@@ -21,6 +21,7 @@
 #include "arrow/table.h"
 #include "arrow/util/type_fwd.h"
 
+namespace gluten {
 arrow::Status ArrowWriter::initWriter(arrow::Schema& schema) {
   if (writer_ != nullptr) {
     return arrow::Status::OK();
@@ -50,9 +51,15 @@ arrow::Status 
ArrowWriter::writeInBatches(std::shared_ptr<arrow::RecordBatch> ba
 }
 
 arrow::Status ArrowWriter::closeWriter() {
-  // Write file footer and close
+  // Write file footer and close.
   if (writer_ != nullptr) {
     ARROW_RETURN_NOT_OK(writer_->Close());
   }
+  closed_ = true;
   return arrow::Status::OK();
 }
+
+bool ArrowWriter::closed() const {
+  return closed_;
+}
+} // namespace gluten
diff --git a/cpp/core/operators/writer/ArrowWriter.h 
b/cpp/core/operators/writer/ArrowWriter.h
index 0d0b8ce2cb..1a7b196066 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/core/operators/writer/ArrowWriter.h
@@ -17,15 +17,19 @@
 
 #pragma once
 
-#include "parquet/arrow/writer.h"
+#include <parquet/arrow/writer.h>
+#include "memory/ColumnarBatch.h"
 
+namespace gluten {
 /**
  * @brief Used to print RecordBatch to a parquet file
  *
  */
 class ArrowWriter {
  public:
-  explicit ArrowWriter(std::string& path) : path_(path) {}
+  explicit ArrowWriter(const std::string& path) : path_(path) {}
+
+  virtual ~ArrowWriter() = default;
 
   arrow::Status initWriter(arrow::Schema& schema);
 
@@ -33,7 +37,13 @@ class ArrowWriter {
 
   arrow::Status closeWriter();
 
- private:
+  bool closed() const;
+
+  virtual std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() = 0;
+
+ protected:
   std::unique_ptr<parquet::arrow::FileWriter> writer_;
   std::string path_;
+  bool closed_{false};
 };
+} // namespace gluten
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index eff31863d4..9e110853eb 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -169,9 +169,12 @@ set(VELOX_SRCS
     operators/functions/RegistrationAllFunctions.cc
     operators/functions/RowConstructorWithNull.cc
     operators/functions/SparkExprToSubfieldFilterParser.cc
+    operators/reader/FileReaderIterator.cc
+    operators/reader/ParquetReaderIterator.cc
     operators/serializer/VeloxColumnarToRowConverter.cc
     operators/serializer/VeloxColumnarBatchSerializer.cc
     operators/serializer/VeloxRowToColumnarConverter.cc
+    operators/writer/VeloxArrowWriter.cc
     operators/writer/VeloxParquetDataSource.cc
     shuffle/VeloxShuffleReader.cc
     shuffle/VeloxShuffleWriter.cc
diff --git a/cpp/velox/benchmarks/CMakeLists.txt 
b/cpp/velox/benchmarks/CMakeLists.txt
index 1aa199b136..6b2cda358c 100644
--- a/cpp/velox/benchmarks/CMakeLists.txt
+++ b/cpp/velox/benchmarks/CMakeLists.txt
@@ -15,8 +15,7 @@
 
 find_arrow_lib(${PARQUET_LIB_NAME})
 
-set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc
-                                common/BenchmarkUtils.cc)
+set(VELOX_BENCHMARK_COMMON_SRCS common/BenchmarkUtils.cc)
 add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS})
 target_include_directories(
   velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox
@@ -38,7 +37,3 @@ add_velox_benchmark(columnar_to_row_benchmark 
ColumnarToRowBenchmark.cc)
 add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)
 
 add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)
-
-if(ENABLE_ORC)
-  add_velox_benchmark(orc_converter exec/OrcConverter.cc)
-endif()
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 4e38fb4432..dcb64d7d18 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -25,12 +25,12 @@
 #include <operators/writer/ArrowWriter.h>
 
 #include "benchmarks/common/BenchmarkUtils.h"
-#include "benchmarks/common/FileReaderIterator.h"
 #include "compute/VeloxBackend.h"
-#include "compute/VeloxPlanConverter.h"
 #include "compute/VeloxRuntime.h"
 #include "config/GlutenConfig.h"
 #include "config/VeloxConfig.h"
+#include "operators/reader/FileReaderIterator.h"
+#include "operators/writer/VeloxArrowWriter.h"
 #include "shuffle/LocalPartitionWriter.h"
 #include "shuffle/VeloxShuffleWriter.h"
 #include "shuffle/rss/RssPartitionWriter.h"
@@ -45,7 +45,6 @@ using namespace gluten;
 
 namespace {
 
-DEFINE_bool(run_example, false, "Run the example and exit.");
 DEFINE_bool(print_result, true, "Print result for execution");
 DEFINE_string(save_output, "", "Path to parquet file for saving the task 
output iterator");
 DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
@@ -388,7 +387,8 @@ auto BM_Generic = [](::benchmark::State& state,
       std::vector<FileReaderIterator*> inputItersRaw;
       if (!dataFiles.empty()) {
         for (const auto& input : dataFiles) {
-          inputIters.push_back(getInputIteratorFromFileReader(input, 
readerType));
+          
inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader(
+              readerType, input, FLAGS_batch_size, 
runtime->memoryManager()->getLeafMemoryPool().get()));
         }
         std::transform(
             inputIters.begin(),
@@ -417,10 +417,11 @@ auto BM_Generic = [](::benchmark::State& state,
         ArrowSchema cSchema;
         toArrowSchema(veloxPlan->outputType(), 
runtime->memoryManager()->getLeafMemoryPool().get(), &cSchema);
         GLUTEN_ASSIGN_OR_THROW(auto outputSchema, 
arrow::ImportSchema(&cSchema));
-        ArrowWriter writer{FLAGS_save_output};
+        auto writer = std::make_shared<VeloxArrowWriter>(
+            FLAGS_save_output, FLAGS_batch_size, 
runtime->memoryManager()->getLeafMemoryPool().get());
         state.PauseTiming();
         if (!FLAGS_save_output.empty()) {
-          GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get())));
+          GLUTEN_THROW_NOT_OK(writer->initWriter(*(outputSchema.get())));
         }
         state.ResumeTiming();
 
@@ -436,13 +437,13 @@ auto BM_Generic = [](::benchmark::State& state,
             LOG(WARNING) << maybeBatch.ValueOrDie()->ToString();
           }
           if (!FLAGS_save_output.empty()) {
-            
GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie()));
+            
GLUTEN_THROW_NOT_OK(writer->writeInBatches(maybeBatch.ValueOrDie()));
           }
         }
 
         state.PauseTiming();
         if (!FLAGS_save_output.empty()) {
-          GLUTEN_THROW_NOT_OK(writer.closeWriter());
+          GLUTEN_THROW_NOT_OK(writer->closeWriter());
         }
         state.ResumeTiming();
       }
@@ -488,7 +489,8 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
   {
     ScopedTimer timer(&elapsedTime);
     for (auto _ : state) {
-      auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
+      auto resultIter = FileReaderIterator::getInputIteratorFromFileReader(
+          readerType, inputFile, FLAGS_batch_size, 
runtime->memoryManager()->getLeafMemoryPool().get());
       runShuffle(
           runtime,
           listenerPtr,
@@ -591,19 +593,7 @@ int main(int argc, char** argv) {
   std::vector<std::string> splitFiles{};
   std::vector<std::string> dataFiles{};
 
-  if (FLAGS_run_example) {
-    LOG(WARNING) << "Running example...";
-    dataFiles.resize(2);
-    try {
-      substraitJsonFile = getGeneratedFilePath("example.json");
-      dataFiles[0] = getGeneratedFilePath("example_orders");
-      dataFiles[1] = getGeneratedFilePath("example_lineitem");
-    } catch (const std::exception& e) {
-      LOG(ERROR) << "Failed to run example. " << e.what();
-      ::benchmark::Shutdown();
-      std::exit(EXIT_FAILURE);
-    }
-  } else if (FLAGS_run_shuffle) {
+  if (FLAGS_run_shuffle) {
     std::string errorMsg{};
     if (FLAGS_data.empty()) {
       errorMsg = "Missing '--split' or '--data' option.";
diff --git a/cpp/velox/benchmarks/common/OrcReaderIterator.h 
b/cpp/velox/benchmarks/common/OrcReaderIterator.h
deleted file mode 100644
index f8c9f44b20..0000000000
--- a/cpp/velox/benchmarks/common/OrcReaderIterator.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <arrow/adapters/orc/adapter.h>
-#include "benchmarks/common/FileReaderIterator.h"
-
-namespace gluten {
-
-class OrcReaderIterator : public FileReaderIterator {
- public:
-  explicit OrcReaderIterator(const std::string& path) : 
FileReaderIterator(path) {}
-
-  void createReader() override {
-    // Open File
-    auto input = arrow::io::ReadableFile::Open(path_);
-    GLUTEN_THROW_NOT_OK(input);
-
-    // Open ORC File Reader
-    auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input, 
arrow::default_memory_pool());
-    GLUTEN_THROW_NOT_OK(maybeReader);
-    fileReader_.reset((*maybeReader).release());
-
-    // get record batch Reader
-    auto recordBatchReader = fileReader_->GetRecordBatchReader(4096, 
std::vector<std::string>());
-    GLUTEN_THROW_NOT_OK(recordBatchReader);
-    recordBatchReader_ = *recordBatchReader;
-  }
-
-  std::shared_ptr<arrow::Schema> getSchema() override {
-    auto schema = fileReader_->ReadSchema();
-    GLUTEN_THROW_NOT_OK(schema);
-    return *schema;
-  }
-
- protected:
-  std::unique_ptr<arrow::adapters::orc::ORCFileReader> fileReader_;
-  std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
-};
-
-class OrcStreamReaderIterator final : public OrcReaderIterator {
- public:
-  explicit OrcStreamReaderIterator(const std::string& path) : 
OrcReaderIterator(path) {
-    createReader();
-  }
-
-  std::shared_ptr<gluten::ColumnarBatch> next() override {
-    auto startTime = std::chrono::steady_clock::now();
-    GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
-    DLOG(INFO) << "OrcStreamReaderIterator get a batch, num rows: " << (batch 
? batch->num_rows() : 0);
-    collectBatchTime_ +=
-        
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
 - startTime).count();
-    if (batch == nullptr) {
-      return nullptr;
-    }
-    return convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(batch));
-  }
-};
-
-class OrcBufferedReaderIterator final : public OrcReaderIterator {
- public:
-  explicit OrcBufferedReaderIterator(const std::string& path) : 
OrcReaderIterator(path) {
-    createReader();
-    collectBatches();
-    iter_ = batches_.begin();
-    DLOG(INFO) << "OrcBufferedReaderIterator open file: " << path;
-    DLOG(INFO) << "Number of input batches: " << 
std::to_string(batches_.size());
-    if (iter_ != batches_.cend()) {
-      DLOG(INFO) << "columns: " << (*iter_)->num_columns();
-      DLOG(INFO) << "rows: " << (*iter_)->num_rows();
-    }
-  }
-
-  std::shared_ptr<gluten::ColumnarBatch> next() override {
-    if (iter_ == batches_.cend()) {
-      return nullptr;
-    }
-    return 
convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
-  }
-
- private:
-  void collectBatches() {
-    auto startTime = std::chrono::steady_clock::now();
-    GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
-    auto endTime = std::chrono::steady_clock::now();
-    collectBatchTime_ += 
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - 
startTime).count();
-  }
-
-  arrow::RecordBatchVector batches_;
-  std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
-};
-
-} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/benchmarks/common/ParquetReaderIterator.h 
b/cpp/velox/benchmarks/common/ParquetReaderIterator.h
deleted file mode 100644
index 20652ee27d..0000000000
--- a/cpp/velox/benchmarks/common/ParquetReaderIterator.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "benchmarks/common/BenchmarkUtils.h"
-#include "benchmarks/common/FileReaderIterator.h"
-#include "utils/Macros.h"
-
-#include <parquet/arrow/reader.h>
-
-namespace gluten {
-
-class ParquetReaderIterator : public FileReaderIterator {
- public:
-  explicit ParquetReaderIterator(const std::string& path) : 
FileReaderIterator(path) {}
-
-  void createReader() {
-    parquet::ArrowReaderProperties properties = 
parquet::default_arrow_reader_properties();
-    properties.set_batch_size(FLAGS_batch_size);
-    GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
-        arrow::default_memory_pool(), 
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
-    GLUTEN_THROW_NOT_OK(
-        
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
 &recordBatchReader_));
-
-    auto schema = recordBatchReader_->schema();
-    LOG(INFO) << "schema:\n" << schema->ToString();
-  }
-
-  std::shared_ptr<arrow::Schema> getSchema() override {
-    return recordBatchReader_->schema();
-  }
-
- protected:
-  std::unique_ptr<parquet::arrow::FileReader> fileReader_;
-  std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
-};
-
-class ParquetStreamReaderIterator final : public ParquetReaderIterator {
- public:
-  explicit ParquetStreamReaderIterator(const std::string& path) : 
ParquetReaderIterator(path) {
-    createReader();
-    DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
-  }
-
-  std::shared_ptr<gluten::ColumnarBatch> next() override {
-    auto startTime = std::chrono::steady_clock::now();
-    GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
-    DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " << 
(batch ? batch->num_rows() : 0);
-    collectBatchTime_ +=
-        
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
 - startTime).count();
-    if (batch == nullptr) {
-      return nullptr;
-    }
-    return convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(batch));
-  }
-};
-
-class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
- public:
-  explicit ParquetBufferedReaderIterator(const std::string& path) : 
ParquetReaderIterator(path) {
-    createReader();
-    collectBatches();
-    iter_ = batches_.begin();
-    DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
-    DLOG(INFO) << "Number of input batches: " << 
std::to_string(batches_.size());
-    if (iter_ != batches_.cend()) {
-      DLOG(INFO) << "columns: " << (*iter_)->num_columns();
-      DLOG(INFO) << "rows: " << (*iter_)->num_rows();
-    }
-  }
-
-  std::shared_ptr<gluten::ColumnarBatch> next() override {
-    if (iter_ == batches_.cend()) {
-      return nullptr;
-    }
-    return 
convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
-  }
-
- private:
-  void collectBatches() {
-    auto startTime = std::chrono::steady_clock::now();
-    GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
-    auto endTime = std::chrono::steady_clock::now();
-    collectBatchTime_ += 
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - 
startTime).count();
-  }
-
-  arrow::RecordBatchVector batches_;
-  std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
-};
-
-} // namespace gluten
diff --git a/cpp/velox/benchmarks/exec/OrcConverter.cc 
b/cpp/velox/benchmarks/exec/OrcConverter.cc
index b421ecca3b..888cf27c35 100644
--- a/cpp/velox/benchmarks/exec/OrcConverter.cc
+++ b/cpp/velox/benchmarks/exec/OrcConverter.cc
@@ -16,7 +16,7 @@
  */
 
 #include <arrow/adapters/orc/adapter.h>
-#include "benchmarks/common/ParquetReaderIterator.h"
+#include "operators/reader/ParquetReaderIterator.h"
 
 namespace gluten {
 
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 4c6b52e6fe..20c3dec939 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -28,16 +28,14 @@
 #include "compute/VeloxPlanConverter.h"
 #include "config/VeloxConfig.h"
 #include "operators/serializer/VeloxRowToColumnarConverter.h"
-#include "shuffle/VeloxHashShuffleWriter.h"
-#include "shuffle/VeloxRssSortShuffleWriter.h"
+#include "operators/writer/VeloxArrowWriter.h"
 #include "shuffle/VeloxShuffleReader.h"
+#include "shuffle/VeloxShuffleWriter.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
 #ifdef ENABLE_HDFS
-
 #include "operators/writer/VeloxParquetDataSourceHDFS.h"
-
 #endif
 
 #ifdef ENABLE_S3
@@ -308,4 +306,12 @@ void VeloxRuntime::dumpConf(const std::string& path) {
   outFile.close();
 }
 
+std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(const 
std::string& path) {
+  int64_t batchSize = 4096;
+  if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) {
+    batchSize = std::atol(it->second.c_str());
+  }
+  return std::make_shared<VeloxArrowWriter>(path, batchSize, 
memoryManager()->getLeafMemoryPool().get());
+}
+
 } // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 846f740cb8..798fa5bc72 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -76,6 +76,8 @@ class VeloxRuntime final : public Runtime {
 
   void dumpConf(const std::string& path) override;
 
+  std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) 
override;
+
   std::shared_ptr<VeloxDataSource> createDataSource(const std::string& 
filePath, std::shared_ptr<arrow::Schema> schema);
 
   std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.cc 
b/cpp/velox/operators/reader/FileReaderIterator.cc
similarity index 55%
rename from cpp/velox/benchmarks/common/FileReaderIterator.cc
rename to cpp/velox/operators/reader/FileReaderIterator.cc
index 26985c7f03..d732adbf33 100644
--- a/cpp/velox/benchmarks/common/FileReaderIterator.cc
+++ b/cpp/velox/operators/reader/FileReaderIterator.cc
@@ -15,33 +15,38 @@
  * limitations under the License.
  */
 
-#include "FileReaderIterator.h"
-#include "benchmarks/common/ParquetReaderIterator.h"
-#ifdef GLUTEN_ENABLE_ORC
-#include "benchmarks/common/OrcReaderIterator.h"
-#endif
+#include "operators/reader/FileReaderIterator.h"
+#include <filesystem>
+#include "operators/reader/ParquetReaderIterator.h"
 
-std::shared_ptr<gluten::ResultIterator> gluten::getInputIteratorFromFileReader(
+namespace gluten {
+namespace {
+const std::string kParquetSuffix = ".parquet";
+}
+
+FileReaderIterator::FileReaderIterator(const std::string& path) : path_(path) 
{}
+
+int64_t FileReaderIterator::getCollectBatchTime() const {
+  return collectBatchTime_;
+}
+
+std::shared_ptr<gluten::ResultIterator> 
FileReaderIterator::getInputIteratorFromFileReader(
+    FileReaderType readerType,
     const std::string& path,
-    gluten::FileReaderType readerType) {
+    int64_t batchSize,
+    facebook::velox::memory::MemoryPool* pool) {
   std::filesystem::path input{path};
   auto suffix = input.extension().string();
   if (suffix == kParquetSuffix) {
     if (readerType == FileReaderType::kStream) {
-      return 
std::make_shared<gluten::ResultIterator>(std::make_unique<ParquetStreamReaderIterator>(path));
-    }
-    if (readerType == FileReaderType::kBuffered) {
-      return 
std::make_shared<gluten::ResultIterator>(std::make_unique<ParquetBufferedReaderIterator>(path));
-    }
-  } else if (suffix == kOrcSuffix) {
-#ifdef GLUTEN_ENABLE_ORC
-    if (readerType == FileReaderType::kStream) {
-      return 
std::make_shared<gluten::ResultIterator>(std::make_unique<OrcStreamReaderIterator>(path));
+      return std::make_shared<gluten::ResultIterator>(
+          std::make_unique<ParquetStreamReaderIterator>(path, batchSize, 
pool));
     }
     if (readerType == FileReaderType::kBuffered) {
-      return 
std::make_shared<gluten::ResultIterator>(std::make_unique<OrcBufferedReaderIterator>(path));
+      return std::make_shared<gluten::ResultIterator>(
+          std::make_unique<ParquetBufferedReaderIterator>(path, batchSize, 
pool));
     }
-#endif
   }
   throw new GlutenException("Unreachable.");
 }
+} // namespace gluten
diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.h 
b/cpp/velox/operators/reader/FileReaderIterator.h
similarity index 69%
rename from cpp/velox/benchmarks/common/FileReaderIterator.h
rename to cpp/velox/operators/reader/FileReaderIterator.h
index 16db58ce45..e782c2bf80 100644
--- a/cpp/velox/benchmarks/common/FileReaderIterator.h
+++ b/cpp/velox/operators/reader/FileReaderIterator.h
@@ -14,43 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
 
-#include <arrow/io/api.h>
-#include <arrow/record_batch.h>
-#include <arrow/util/range.h>
+#pragma once
 
-#include "BenchmarkUtils.h"
 #include "compute/ResultIterator.h"
-#include "memory/ColumnarBatch.h"
 #include "memory/ColumnarBatchIterator.h"
+#include "velox/common/memory/MemoryPool.h"
 
 namespace gluten {
-
-static const std::string kOrcSuffix = ".orc";
-static const std::string kParquetSuffix = ".parquet";
-
 enum FileReaderType { kBuffered, kStream, kNone };
 
 class FileReaderIterator : public ColumnarBatchIterator {
  public:
-  explicit FileReaderIterator(const std::string& path) : path_(path) {}
+  static std::shared_ptr<gluten::ResultIterator> 
getInputIteratorFromFileReader(
+      FileReaderType readerType,
+      const std::string& path,
+      int64_t batchSize,
+      facebook::velox::memory::MemoryPool* pool);
+
+  explicit FileReaderIterator(const std::string& path);
 
   virtual ~FileReaderIterator() = default;
 
   virtual std::shared_ptr<arrow::Schema> getSchema() = 0;
 
-  int64_t getCollectBatchTime() const {
-    return collectBatchTime_;
-  }
+  int64_t getCollectBatchTime() const;
 
  protected:
   int64_t collectBatchTime_ = 0;
   std::string path_;
 };
 
-std::shared_ptr<gluten::ResultIterator> getInputIteratorFromFileReader(
-    const std::string& path,
-    FileReaderType readerType);
-
 } // namespace gluten
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.cc 
b/cpp/velox/operators/reader/ParquetReaderIterator.cc
new file mode 100644
index 0000000000..3e61e1d8d9
--- /dev/null
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.cc
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/reader/ParquetReaderIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+
+#include <arrow/util/range.h>
+
+namespace gluten {
+
+ParquetReaderIterator::ParquetReaderIterator(
+    const std::string& path,
+    int64_t batchSize,
+    facebook::velox::memory::MemoryPool* pool)
+    : FileReaderIterator(path), batchSize_(batchSize), pool_(pool) {}
+
+void ParquetReaderIterator::createReader() {
+  parquet::ArrowReaderProperties properties = 
parquet::default_arrow_reader_properties();
+  properties.set_batch_size(batchSize_);
+  GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
+      arrow::default_memory_pool(), 
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
+  GLUTEN_THROW_NOT_OK(
+      
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
 &recordBatchReader_));
+
+  auto schema = recordBatchReader_->schema();
+  DLOG(INFO) << "Schema:\n" << schema->ToString();
+}
+
+std::shared_ptr<arrow::Schema> ParquetReaderIterator::getSchema() {
+  return recordBatchReader_->schema();
+}
+
+ParquetStreamReaderIterator::ParquetStreamReaderIterator(
+    const std::string& path,
+    int64_t batchSize,
+    facebook::velox::memory::MemoryPool* pool)
+    : ParquetReaderIterator(path, batchSize, pool) {
+  createReader();
+  DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
+}
+
+std::shared_ptr<gluten::ColumnarBatch> ParquetStreamReaderIterator::next() {
+  auto startTime = std::chrono::steady_clock::now();
+  GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
+  DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " << 
(batch ? batch->num_rows() : 0);
+  collectBatchTime_ +=
+      
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
 - startTime).count();
+  if (batch == nullptr) {
+    return nullptr;
+  }
+  return VeloxColumnarBatch::from(pool_, 
std::make_shared<gluten::ArrowColumnarBatch>(batch));
+}
+
+ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
+    const std::string& path,
+    int64_t batchSize,
+    facebook::velox::memory::MemoryPool* pool)
+    : ParquetReaderIterator(path, batchSize, pool) {
+  createReader();
+  collectBatches();
+  iter_ = batches_.begin();
+  DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
+  DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
+  if (iter_ != batches_.cend()) {
+    DLOG(INFO) << "columns: " << (*iter_)->num_columns();
+    DLOG(INFO) << "rows: " << (*iter_)->num_rows();
+  }
+}
+
+std::shared_ptr<gluten::ColumnarBatch> ParquetBufferedReaderIterator::next() {
+  if (iter_ == batches_.cend()) {
+    return nullptr;
+  }
+  return VeloxColumnarBatch::from(pool_, 
std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
+}
+
+void ParquetBufferedReaderIterator::collectBatches() {
+  auto startTime = std::chrono::steady_clock::now();
+  GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
+  auto endTime = std::chrono::steady_clock::now();
+  collectBatchTime_ += 
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - 
startTime).count();
+}
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.h 
b/cpp/velox/operators/reader/ParquetReaderIterator.h
new file mode 100644
index 0000000000..f45fe5eb77
--- /dev/null
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/reader/FileReaderIterator.h"
+
+#include <parquet/arrow/reader.h>
+#include <memory>
+
+namespace gluten {
+
+class ParquetReaderIterator : public FileReaderIterator {
+ public:
+  explicit ParquetReaderIterator(const std::string& path, int64_t batchSize, 
facebook::velox::memory::MemoryPool* pool);
+
+  void createReader();
+
+  std::shared_ptr<arrow::Schema> getSchema() override;
+
+ protected:
+  std::unique_ptr<::parquet::arrow::FileReader> fileReader_;
+  std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
+  int64_t batchSize_;
+  facebook::velox::memory::MemoryPool* pool_;
+};
+
+class ParquetStreamReaderIterator final : public ParquetReaderIterator {
+ public:
+  ParquetStreamReaderIterator(const std::string& path, int64_t batchSize, 
facebook::velox::memory::MemoryPool* pool);
+
+  std::shared_ptr<gluten::ColumnarBatch> next() override;
+};
+
+class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
+ public:
+  explicit ParquetBufferedReaderIterator(
+      const std::string& path,
+      int64_t batchSize,
+      facebook::velox::memory::MemoryPool* pool);
+
+  std::shared_ptr<gluten::ColumnarBatch> next() override;
+
+ private:
+  void collectBatches();
+
+  arrow::RecordBatchVector batches_;
+  std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index acb14cf4de..9c5d166a07 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -17,10 +17,11 @@
 
 #include "VeloxColumnarBatchSerializer.h"
 
+#include <arrow/buffer.h>
+
 #include "memory/ArrowMemory.h"
 #include "memory/VeloxColumnarBatch.h"
 #include "velox/common/memory/Memory.h"
-#include "velox/vector/ComplexVector.h"
 #include "velox/vector/FlatVector.h"
 #include "velox/vector/arrow/Bridge.h"
 
diff --git a/cpp/core/operators/writer/ArrowWriter.h 
b/cpp/velox/operators/writer/VeloxArrowWriter.cc
similarity index 57%
copy from cpp/core/operators/writer/ArrowWriter.h
copy to cpp/velox/operators/writer/VeloxArrowWriter.cc
index 0d0b8ce2cb..565602d95c 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/velox/operators/writer/VeloxArrowWriter.cc
@@ -15,25 +15,24 @@
  * limitations under the License.
  */
 
-#pragma once
+#include "operators/writer/VeloxArrowWriter.h"
 
-#include "parquet/arrow/writer.h"
+namespace gluten {
 
-/**
- * @brief Used to print RecordBatch to a parquet file
- *
- */
-class ArrowWriter {
- public:
-  explicit ArrowWriter(std::string& path) : path_(path) {}
-
-  arrow::Status initWriter(arrow::Schema& schema);
-
-  arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);
-
-  arrow::Status closeWriter();
+VeloxArrowWriter::VeloxArrowWriter(
+    const std::string& path,
+    int64_t batchSize,
+    facebook::velox::memory::MemoryPool* pool)
+    : ArrowWriter(path), batchSize_(batchSize), pool_(pool) {}
 
- private:
-  std::unique_ptr<parquet::arrow::FileWriter> writer_;
-  std::string path_;
-};
+std::shared_ptr<ColumnarBatch> VeloxArrowWriter::retrieveColumnarBatch() {
+  if (writer_ == nullptr) {
+    // No data to read.
+    return nullptr;
+  }
+  if (reader_ == nullptr) {
+    reader_ = std::make_unique<ParquetStreamReaderIterator>(path_, batchSize_, 
pool_);
+  }
+  return reader_->next();
+}
+} // namespace gluten
diff --git a/cpp/core/operators/writer/ArrowWriter.h 
b/cpp/velox/operators/writer/VeloxArrowWriter.h
similarity index 62%
copy from cpp/core/operators/writer/ArrowWriter.h
copy to cpp/velox/operators/writer/VeloxArrowWriter.h
index 0d0b8ce2cb..8b79986287 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/velox/operators/writer/VeloxArrowWriter.h
@@ -17,23 +17,19 @@
 
 #pragma once
 
-#include "parquet/arrow/writer.h"
+#include "operators/reader/ParquetReaderIterator.h"
+#include "operators/writer/ArrowWriter.h"
 
-/**
- * @brief Used to print RecordBatch to a parquet file
- *
- */
-class ArrowWriter {
+namespace gluten {
+class VeloxArrowWriter : public ArrowWriter {
  public:
-  explicit ArrowWriter(std::string& path) : path_(path) {}
-
-  arrow::Status initWriter(arrow::Schema& schema);
-
-  arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);
+  explicit VeloxArrowWriter(const std::string& path, int64_t batchSize, 
facebook::velox::memory::MemoryPool* pool);
 
-  arrow::Status closeWriter();
+  std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() override;
 
  private:
-  std::unique_ptr<parquet::arrow::FileWriter> writer_;
-  std::string path_;
+  int64_t batchSize_;
+  facebook::velox::memory::MemoryPool* pool_;
+  std::unique_ptr<ParquetStreamReaderIterator> reader_{nullptr};
 };
+} // namespace gluten
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 4b5bcdf819..b4944d9205 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -103,6 +103,10 @@ class DummyRuntime final : public Runtime {
     throw GlutenException("Not yet implemented");
   }
 
+  std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path) 
override {
+    throw GlutenException("Not yet implemented");
+  }
+
  private:
   class DummyResultIterator : public ColumnarBatchIterator {
    public:
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc 
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index ffa6f032ac..c00ab6a148 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -19,12 +19,13 @@
 
 #include "memory/ArrowMemoryPool.h"
 #include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
 #include "operators/serializer/VeloxColumnarBatchSerializer.h"
 #include "utils/VeloxArrowUtils.h"
 #include "velox/vector/arrow/Bridge.h"
 #include "velox/vector/tests/utils/VectorTestBase.h"
 
+#include <arrow/buffer.h>
+
 using namespace facebook::velox;
 
 namespace gluten {
diff --git a/cpp/velox/utils/VeloxArrowUtils.cc 
b/cpp/velox/utils/VeloxArrowUtils.cc
index 0349eb718b..f26b49a476 100644
--- a/cpp/velox/utils/VeloxArrowUtils.cc
+++ b/cpp/velox/utils/VeloxArrowUtils.cc
@@ -16,6 +16,9 @@
  */
 
 #include "utils/VeloxArrowUtils.h"
+
+#include <arrow/buffer.h>
+
 #include "memory/VeloxColumnarBatch.h"
 #include "utils/Common.h"
 #include "velox/vector/ComplexVector.h"
diff --git a/docs/developers/MicroBenchmarks.md 
b/docs/developers/MicroBenchmarks.md
index eedf5010b6..1483dc2cba 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -64,7 +64,7 @@ cd /path/to/gluten/cpp/build/velox/benchmarks
 --plan 
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example.json
 \
 --data 
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_orders/part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet,\
 
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_lineitem/part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
 \
---threads 1 --iterations 1 --noprint-result 
--benchmark_filter=InputFromBatchStream
+--threads 1 --iterations 1 --noprint-result
 ```
 
 The output should be like:
@@ -118,12 +118,12 @@ cd /path/to/gluten/
 First, get the Stage Id from spark UI for the stage you want to simulate.
 And then re-run the query with below configurations to dump the inputs to 
micro benchmark.
 
-| Parameters                                  | Description                    
                                                                                
| Recommend Setting     |
-|---------------------------------------------|----------------------------------------------------------------------------------------------------------------|-----------------------|
-| spark.gluten.sql.benchmark_task.stageId     | Spark task stage id            
                                                                                
| target stage id       |
-| spark.gluten.sql.benchmark_task.partitionId | Spark task partition id, 
default value -1 means all the partition of this stage                          
      | 0                     |
-| spark.gluten.sql.benchmark_task.taskId      | If not specify partition id, 
use spark task attempt id, default value -1 means all the partition of this 
stage | target task attemp id |
-| spark.gluten.saveDir                        | Directory to save the inputs 
to micro benchmark, should exist and be empty.                                  
  | /path/to/saveDir      |
+| Parameters                                  | Description                    
                                                                                
                                                                                
                                                                                
                                                             | Recommend 
Setting                                          |
+|---------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------|
+| spark.gluten.sql.benchmark_task.taskId      | Comma-separated string to 
specify the Task IDs to dump. If it's set, 
`spark.gluten.sql.benchmark_task.stageId` and 
`spark.gluten.sql.benchmark_task.partitionId` will be ignored.                  
                                                                                
                                                         | Comma-separated 
string of task IDs. Empty by default.      |
+| spark.gluten.sql.benchmark_task.stageId     | Spark stage ID.                
                                                                                
                                                                                
                                                                                
                                                             | Target stage ID  
                                          |
+| spark.gluten.sql.benchmark_task.partitionId | Comma-separated string to 
specify the Partition IDs in a stage to dump. Must be specified together with 
`spark.gluten.sql.benchmark_task.stageId`. Empty by default, meaning all 
partitions of this stage will be dumped. To identify the partition ID, navigate 
to the `Stage` tab in the Spark UI and locate it under the `Index` column. | 
Comma-separated string of partition IDs. Empty by default. |
+| spark.gluten.saveDir                        | Directory to save the inputs 
to micro benchmark, should exist and be empty.                                  
                                                                                
                                                                                
                                                               | 
/path/to/saveDir                                           |
 
 Check the files in `spark.gluten.saveDir`. If the simulated stage is a first 
stage, you will get 3
 or 4 types of dumped file:
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
index dea0a48140..f9bb7478e7 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
@@ -21,24 +21,32 @@ import org.apache.gluten.GlutenConfig
 import org.apache.spark.TaskContext
 
 object DebugUtil {
-  // if specify taskId, then only do that task partition
-  // if not specify stageId, then do nothing
+  // if taskId is specified and matches, then do that task
+  // if stageId is not specified or doesn't match, then do nothing
   // if specify stageId but no partitionId, then do all partitions for that 
stage
   // if specify stageId and partitionId, then only do that partition for that 
stage
   def saveInputToFile(): Boolean = {
-    if (TaskContext.get().taskAttemptId() == GlutenConfig.getConf.taskId) {
-      return true
-    }
-    if (TaskContext.get().stageId() != GlutenConfig.getConf.taskStageId) {
-      return false
-    }
-    if (GlutenConfig.getConf.taskPartitionId == -1) {
-      return true
-    }
-    if (TaskContext.getPartitionId() == GlutenConfig.getConf.taskPartitionId) {
-      return true
-    }
+    def taskIdMatches =
+      GlutenConfig.getConf.benchmarkTaskId.nonEmpty &&
+        GlutenConfig.getConf.benchmarkTaskId
+          .split(",")
+          .map(_.toLong)
+          .contains(TaskContext.get().taskAttemptId())
+
+    def partitionIdMatches =
+      TaskContext.get().stageId() == GlutenConfig.getConf.benchmarkStageId &&
+        (GlutenConfig.getConf.benchmarkPartitionId.isEmpty ||
+          GlutenConfig.getConf.benchmarkPartitionId
+            .split(",")
+            .map(_.toInt)
+            .contains(TaskContext.get().partitionId()))
 
-    false
+    val saveInput = taskIdMatches || partitionIdMatches
+    if (saveInput) {
+      if (GlutenConfig.getConf.benchmarkSaveDir.isEmpty) {
+        throw new IllegalArgumentException(GlutenConfig.BENCHMARK_SAVE_DIR.key 
+ " is not set.")
+      }
+    }
+    saveInput
   }
 }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index f756eb20a6..e0d06ce6fc 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -423,9 +423,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def debug: Boolean = conf.getConf(DEBUG_ENABLED)
   def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
   def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
-  def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
-  def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID)
-  def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID)
+  def benchmarkStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
+  def benchmarkPartitionId: String = conf.getConf(BENCHMARK_TASK_PARTITIONID)
+  def benchmarkTaskId: String = conf.getConf(BENCHMARK_TASK_TASK_ID)
+  def benchmarkSaveDir: String = conf.getConf(BENCHMARK_SAVE_DIR)
   def textInputMaxBlockSize: Long = conf.getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
   def textIputEmptyAsDefault: Boolean = 
conf.getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
   def enableParquetRowGroupMaxMinIndex: Boolean =
@@ -1719,14 +1720,20 @@ object GlutenConfig {
   val BENCHMARK_TASK_PARTITIONID =
     buildConf("spark.gluten.sql.benchmark_task.partitionId")
       .internal()
-      .intConf
-      .createWithDefault(-1)
+      .stringConf
+      .createWithDefault("")
 
   val BENCHMARK_TASK_TASK_ID =
     buildConf("spark.gluten.sql.benchmark_task.taskId")
       .internal()
-      .longConf
-      .createWithDefault(-1L)
+      .stringConf
+      .createWithDefault("")
+
+  val BENCHMARK_SAVE_DIR =
+    buildConf(GLUTEN_SAVE_DIR)
+      .internal()
+      .stringConf
+      .createWithDefault("")
 
   val NATIVE_WRITER_ENABLED =
     buildConf("spark.gluten.sql.native.writer.enabled")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to