This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 58e7d831e7 [GLUTEN-7953][VL] Fetch and dump all inputs for micro
benchmark on middle stage begin (#7998)
58e7d831e7 is described below
commit 58e7d831e7a18d707576a983f2b49d91b0b3d67f
Author: Rong Ma <[email protected]>
AuthorDate: Mon Nov 25 09:19:52 2024 +0800
[GLUTEN-7953][VL] Fetch and dump all inputs for micro benchmark on middle
stage begin (#7998)
---
.github/workflows/velox_backend.yml | 8 +-
.../benchmarks/NativeBenchmarkPlanGenerator.scala | 98 +++++++------------
cpp/CMakeLists.txt | 4 -
cpp/core/compute/Runtime.h | 3 +
cpp/core/jni/JniCommon.cc | 44 ++++++---
cpp/core/jni/JniCommon.h | 1 +
cpp/core/jni/JniWrapper.cc | 14 ++-
cpp/core/memory/ColumnarBatch.h | 1 -
cpp/core/operators/writer/ArrowWriter.cc | 9 +-
cpp/core/operators/writer/ArrowWriter.h | 16 ++-
cpp/velox/CMakeLists.txt | 3 +
cpp/velox/benchmarks/CMakeLists.txt | 7 +-
cpp/velox/benchmarks/GenericBenchmark.cc | 34 +++----
cpp/velox/benchmarks/common/OrcReaderIterator.h | 107 ---------------------
.../benchmarks/common/ParquetReaderIterator.h | 104 --------------------
cpp/velox/benchmarks/exec/OrcConverter.cc | 2 +-
cpp/velox/compute/VeloxRuntime.cc | 14 ++-
cpp/velox/compute/VeloxRuntime.h | 2 +
.../reader}/FileReaderIterator.cc | 41 ++++----
.../reader}/FileReaderIterator.h | 28 ++----
.../operators/reader/ParquetReaderIterator.cc | 97 +++++++++++++++++++
cpp/velox/operators/reader/ParquetReaderIterator.h | 65 +++++++++++++
.../serializer/VeloxColumnarBatchSerializer.cc | 3 +-
.../operators/writer/VeloxArrowWriter.cc} | 37 ++++---
.../operators/writer/VeloxArrowWriter.h} | 24 ++---
cpp/velox/tests/RuntimeTest.cc | 4 +
.../tests/VeloxColumnarBatchSerializerTest.cc | 3 +-
cpp/velox/utils/VeloxArrowUtils.cc | 3 +
docs/developers/MicroBenchmarks.md | 14 +--
.../scala/org/apache/gluten/utils/DebugUtil.scala | 38 +++++---
.../scala/org/apache/gluten/GlutenConfig.scala | 21 ++--
31 files changed, 411 insertions(+), 438 deletions(-)
diff --git a/.github/workflows/velox_backend.yml
b/.github/workflows/velox_backend.yml
index 42e1f25278..d7445d1a27 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -969,9 +969,11 @@ jobs:
run: |
$MVN_CMD test -Pspark-3.5 -Pbackends-velox -pl backends-velox -am \
-DtagsToInclude="org.apache.gluten.tags.GenerateExample" -Dtest=none
-DfailIfNoTests=false -Dexec.skip
- # This test depends on example.json generated by the above mvn test.
- cd cpp/build/velox/benchmarks && sudo chmod +x ./generic_benchmark
- ./generic_benchmark --run-example --with-shuffle --threads 1
--iterations 1
+ # This test depends on files generated by the above mvn test.
+ ./cpp/build/velox/benchmarks/generic_benchmark --with-shuffle
--partitioning hash --threads 1 --iterations 1 \
+ --conf $(realpath
backends-velox/generated-native-benchmark/conf_12_0.ini) \
+ --plan $(realpath
backends-velox/generated-native-benchmark/plan_12_0.json) \
+ --data $(realpath
backends-velox/generated-native-benchmark/data_12_0_0.parquet),$(realpath
backends-velox/generated-native-benchmark/data_12_0_1.parquet)
- name: Run UDF test
run: |
# Depends on --build_example=ON.
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
index 57fbca1744..1e378d16f1 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/benchmarks/NativeBenchmarkPlanGenerator.scala
@@ -19,18 +19,14 @@ package org.apache.gluten.benchmarks
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{VeloxWholeStageTransformerSuite,
WholeStageTransformer}
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
ShuffleQueryStageExec}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.commons.io.FileUtils
import org.scalatest.Tag
import java.io.File
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
-
-import scala.collection.JavaConverters._
object GenerateExample extends Tag("org.apache.gluten.tags.GenerateExample")
@@ -50,6 +46,11 @@ class NativeBenchmarkPlanGenerator extends
VeloxWholeStageTransformerSuite {
createTPCHNotNullTables()
}
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
test("Test plan json non-empty - AQE off") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
@@ -67,7 +68,6 @@ class NativeBenchmarkPlanGenerator extends
VeloxWholeStageTransformerSuite {
planJson =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
- spark.sparkContext.setLogLevel(logLevel)
}
test("Test plan json non-empty - AQE on") {
@@ -88,70 +88,42 @@ class NativeBenchmarkPlanGenerator extends
VeloxWholeStageTransformerSuite {
val planJson =
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson
assert(planJson.nonEmpty)
}
- spark.sparkContext.setLogLevel(logLevel)
}
test("generate example", GenerateExample) {
- import testImplicits._
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
- GlutenConfig.CACHE_WHOLE_STAGE_TRANSFORMER_CONTEXT.key -> "true"
+ GlutenConfig.BENCHMARK_SAVE_DIR.key -> generatedPlanDir,
+ GlutenConfig.BENCHMARK_TASK_STAGEID.key -> "12",
+ GlutenConfig.BENCHMARK_TASK_PARTITIONID.key -> "0"
) {
logWarning(s"Generating inputs for micro benchmark to $generatedPlanDir")
- val q4_lineitem = spark
- .sql(s"""
- |select l_orderkey from lineitem where l_commitdate <
l_receiptdate
- |""".stripMargin)
- val q4_orders = spark
- .sql(s"""
- |select o_orderkey, o_orderpriority
- | from orders
- | where o_orderdate >= '1993-07-01' and o_orderdate <
'1993-10-01'
- |""".stripMargin)
- q4_lineitem
- .createOrReplaceTempView("q4_lineitem")
- q4_orders
- .createOrReplaceTempView("q4_orders")
-
- q4_lineitem
- .repartition(1, 'l_orderkey)
- .write
- .format(outputFileFormat)
- .save(generatedPlanDir + "/example_lineitem")
- q4_orders
- .repartition(1, 'o_orderkey)
- .write
- .format(outputFileFormat)
- .save(generatedPlanDir + "/example_orders")
-
- val df =
- spark.sql("""
- |select * from q4_orders left semi join q4_lineitem on
l_orderkey = o_orderkey
- |""".stripMargin)
- generateSubstraitJson(df, "example.json")
+ spark
+ .sql("""
+ |select /*+ REPARTITION(1) */
+ | o_orderpriority,
+ | count(*) as order_count
+ |from
+ | orders
+ |where
+ | o_orderdate >= date '1993-07-01'
+ | and o_orderdate < date '1993-07-01' + interval '3' month
+ | and exists (
+ | select /*+ REPARTITION(1) */
+ | *
+ | from
+ | lineitem
+ | where
+ | l_orderkey = o_orderkey
+ | and l_commitdate < l_receiptdate
+ | )
+ |group by
+ | o_orderpriority
+ |order by
+ | o_orderpriority
+ |""".stripMargin)
+ .foreach(_ => ())
}
- spark.sparkContext.setLogLevel(logLevel)
- }
-
- def generateSubstraitJson(df: DataFrame, file: String): Unit = {
- val executedPlan = df.queryExecution.executedPlan
- executedPlan.execute()
- val finalPlan =
- executedPlan match {
- case aqe: AdaptiveSparkPlanExec =>
- aqe.executedPlan match {
- case s: ShuffleQueryStageExec => s.shuffle.child
- case other => other
- }
- case plan => plan
- }
- val lastStageTransformer =
finalPlan.find(_.isInstanceOf[WholeStageTransformer])
- assert(lastStageTransformer.nonEmpty)
- val plan =
-
lastStageTransformer.get.asInstanceOf[WholeStageTransformer].substraitPlanJson.split('\n')
-
- val exampleJsonFile = Paths.get(generatedPlanDir, file)
- Files.write(exampleJsonFile, plan.toList.asJava, StandardCharsets.UTF_8)
}
}
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 09d09481f2..67fb9ec721 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -217,10 +217,6 @@ if(ENABLE_IAA)
add_definitions(-DGLUTEN_ENABLE_IAA)
endif()
-if(ENABLE_ORC)
- add_definitions(-DGLUTEN_ENABLE_ORC)
-endif()
-
#
# Subdirectories
#
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index a58f770ff7..3090652b81 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -27,6 +27,7 @@
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
+#include "operators/writer/ArrowWriter.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
@@ -124,6 +125,8 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
virtual void dumpConf(const std::string& path) = 0;
+ virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string&
path) = 0;
+
const std::unordered_map<std::string, std::string>& getConfMap() {
return confMap_;
}
diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc
index 46b53e8c3b..bb8554568b 100644
--- a/cpp/core/jni/JniCommon.cc
+++ b/cpp/core/jni/JniCommon.cc
@@ -104,22 +104,34 @@
gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
std::shared_ptr<gluten::ColumnarBatch>
gluten::JniColumnarBatchIterator::next() {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
- if (!env->CallBooleanMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorHasNext_)) {
- checkException(env);
- return nullptr; // stream ended
- }
-
- checkException(env);
- jlong handle = env->CallLongMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorNext_);
- checkException(env);
- auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
if (writer_ != nullptr) {
- // save snapshot of the batch to file
- std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
- std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
- auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(),
schema.get()));
- GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
- GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+ if (!writer_->closed()) {
+ // Dump all inputs.
+ while (env->CallBooleanMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorHasNext_)) {
+ checkException(env);
+ jlong handle = env->CallLongMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorNext_);
+ checkException(env);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
+
+ // Save the snapshot of the batch to file.
+ std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
+ std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
+ auto rb =
gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), schema.get()));
+ GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
+ GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+ }
+ checkException(env);
+ GLUTEN_THROW_NOT_OK(writer_->closeWriter());
+ }
+ return writer_->retrieveColumnarBatch();
+ } else {
+ if (!env->CallBooleanMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorHasNext_)) {
+ checkException(env);
+ return nullptr; // stream ended
+ }
+ checkException(env);
+ jlong handle = env->CallLongMethod(jColumnarBatchItr_,
serializedColumnarBatchIteratorNext_);
+ checkException(env);
+ return ObjectStore::retrieve<ColumnarBatch>(handle);
}
- return batch;
}
diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h
index aeab454f1a..8f40398a41 100644
--- a/cpp/core/jni/JniCommon.h
+++ b/cpp/core/jni/JniCommon.h
@@ -26,6 +26,7 @@
#include "compute/Runtime.h"
#include "config/GlutenConfig.h"
#include "memory/AllocationListener.h"
+#include "operators/writer/ArrowWriter.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
#include "utils/Exception.h"
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 9da5589486..963440f6fc 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -373,8 +373,16 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
}
saveDir = conf.at(kGlutenSaveDir);
std::filesystem::path f{saveDir};
- if (!std::filesystem::exists(f)) {
- throw GlutenException("Save input path " + saveDir + " does not exists");
+ if (std::filesystem::exists(f)) {
+ if (!std::filesystem::is_directory(f)) {
+ throw GlutenException("Invalid path for " + kGlutenSaveDir + ": " +
saveDir);
+ }
+ } else {
+ std::error_code ec;
+ std::filesystem::create_directory(f, ec);
+ if (ec) {
+ throw GlutenException("Failed to create directory: " + saveDir + ",
error message: " + ec.message());
+ }
}
ctx->dumpConf(saveDir + "/conf" + fileIdentifier + ".ini");
}
@@ -407,7 +415,7 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
std::shared_ptr<ArrowWriter> writer = nullptr;
if (saveInput) {
auto file = saveDir + "/data" + fileIdentifier + "_" +
std::to_string(idx) + ".parquet";
- writer = std::make_shared<ArrowWriter>(file);
+ writer = ctx->createArrowWriter(file);
}
jobject iter = env->GetObjectArrayElement(iterArr, idx);
auto arrayIter = makeJniColumnarBatchIterator(env, iter, ctx, writer);
diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h
index e0bab25418..be487f871e 100644
--- a/cpp/core/memory/ColumnarBatch.h
+++ b/cpp/core/memory/ColumnarBatch.h
@@ -23,7 +23,6 @@
#include "arrow/c/helpers.h"
#include "arrow/record_batch.h"
#include "memory/MemoryManager.h"
-#include "operators/writer/ArrowWriter.h"
#include "utils/ArrowStatus.h"
#include "utils/Exception.h"
diff --git a/cpp/core/operators/writer/ArrowWriter.cc
b/cpp/core/operators/writer/ArrowWriter.cc
index 19bab6ddcb..46ec2fc9ba 100644
--- a/cpp/core/operators/writer/ArrowWriter.cc
+++ b/cpp/core/operators/writer/ArrowWriter.cc
@@ -21,6 +21,7 @@
#include "arrow/table.h"
#include "arrow/util/type_fwd.h"
+namespace gluten {
arrow::Status ArrowWriter::initWriter(arrow::Schema& schema) {
if (writer_ != nullptr) {
return arrow::Status::OK();
@@ -50,9 +51,15 @@ arrow::Status
ArrowWriter::writeInBatches(std::shared_ptr<arrow::RecordBatch> ba
}
arrow::Status ArrowWriter::closeWriter() {
- // Write file footer and close
+ // Write file footer and close.
if (writer_ != nullptr) {
ARROW_RETURN_NOT_OK(writer_->Close());
}
+ closed_ = true;
return arrow::Status::OK();
}
+
+bool ArrowWriter::closed() const {
+ return closed_;
+}
+} // namespace gluten
diff --git a/cpp/core/operators/writer/ArrowWriter.h
b/cpp/core/operators/writer/ArrowWriter.h
index 0d0b8ce2cb..1a7b196066 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/core/operators/writer/ArrowWriter.h
@@ -17,15 +17,19 @@
#pragma once
-#include "parquet/arrow/writer.h"
+#include <parquet/arrow/writer.h>
+#include "memory/ColumnarBatch.h"
+namespace gluten {
/**
* @brief Used to print RecordBatch to a parquet file
*
*/
class ArrowWriter {
public:
- explicit ArrowWriter(std::string& path) : path_(path) {}
+ explicit ArrowWriter(const std::string& path) : path_(path) {}
+
+ virtual ~ArrowWriter() = default;
arrow::Status initWriter(arrow::Schema& schema);
@@ -33,7 +37,13 @@ class ArrowWriter {
arrow::Status closeWriter();
- private:
+ bool closed() const;
+
+ virtual std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() = 0;
+
+ protected:
std::unique_ptr<parquet::arrow::FileWriter> writer_;
std::string path_;
+ bool closed_{false};
};
+} // namespace gluten
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index eff31863d4..9e110853eb 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -169,9 +169,12 @@ set(VELOX_SRCS
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkExprToSubfieldFilterParser.cc
+ operators/reader/FileReaderIterator.cc
+ operators/reader/ParquetReaderIterator.cc
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
+ operators/writer/VeloxArrowWriter.cc
operators/writer/VeloxParquetDataSource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
diff --git a/cpp/velox/benchmarks/CMakeLists.txt
b/cpp/velox/benchmarks/CMakeLists.txt
index 1aa199b136..6b2cda358c 100644
--- a/cpp/velox/benchmarks/CMakeLists.txt
+++ b/cpp/velox/benchmarks/CMakeLists.txt
@@ -15,8 +15,7 @@
find_arrow_lib(${PARQUET_LIB_NAME})
-set(VELOX_BENCHMARK_COMMON_SRCS common/FileReaderIterator.cc
- common/BenchmarkUtils.cc)
+set(VELOX_BENCHMARK_COMMON_SRCS common/BenchmarkUtils.cc)
add_library(velox_benchmark_common STATIC ${VELOX_BENCHMARK_COMMON_SRCS})
target_include_directories(
velox_benchmark_common PUBLIC ${CMAKE_SOURCE_DIR}/velox
@@ -38,7 +37,3 @@ add_velox_benchmark(columnar_to_row_benchmark
ColumnarToRowBenchmark.cc)
add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)
add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)
-
-if(ENABLE_ORC)
- add_velox_benchmark(orc_converter exec/OrcConverter.cc)
-endif()
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index 4e38fb4432..dcb64d7d18 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -25,12 +25,12 @@
#include <operators/writer/ArrowWriter.h>
#include "benchmarks/common/BenchmarkUtils.h"
-#include "benchmarks/common/FileReaderIterator.h"
#include "compute/VeloxBackend.h"
-#include "compute/VeloxPlanConverter.h"
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
+#include "operators/reader/FileReaderIterator.h"
+#include "operators/writer/VeloxArrowWriter.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
@@ -45,7 +45,6 @@ using namespace gluten;
namespace {
-DEFINE_bool(run_example, false, "Run the example and exit.");
DEFINE_bool(print_result, true, "Print result for execution");
DEFINE_string(save_output, "", "Path to parquet file for saving the task
output iterator");
DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
@@ -388,7 +387,8 @@ auto BM_Generic = [](::benchmark::State& state,
std::vector<FileReaderIterator*> inputItersRaw;
if (!dataFiles.empty()) {
for (const auto& input : dataFiles) {
- inputIters.push_back(getInputIteratorFromFileReader(input,
readerType));
+
inputIters.push_back(FileReaderIterator::getInputIteratorFromFileReader(
+ readerType, input, FLAGS_batch_size,
runtime->memoryManager()->getLeafMemoryPool().get()));
}
std::transform(
inputIters.begin(),
@@ -417,10 +417,11 @@ auto BM_Generic = [](::benchmark::State& state,
ArrowSchema cSchema;
toArrowSchema(veloxPlan->outputType(),
runtime->memoryManager()->getLeafMemoryPool().get(), &cSchema);
GLUTEN_ASSIGN_OR_THROW(auto outputSchema,
arrow::ImportSchema(&cSchema));
- ArrowWriter writer{FLAGS_save_output};
+ auto writer = std::make_shared<VeloxArrowWriter>(
+ FLAGS_save_output, FLAGS_batch_size,
runtime->memoryManager()->getLeafMemoryPool().get());
state.PauseTiming();
if (!FLAGS_save_output.empty()) {
- GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get())));
+ GLUTEN_THROW_NOT_OK(writer->initWriter(*(outputSchema.get())));
}
state.ResumeTiming();
@@ -436,13 +437,13 @@ auto BM_Generic = [](::benchmark::State& state,
LOG(WARNING) << maybeBatch.ValueOrDie()->ToString();
}
if (!FLAGS_save_output.empty()) {
-
GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie()));
+
GLUTEN_THROW_NOT_OK(writer->writeInBatches(maybeBatch.ValueOrDie()));
}
}
state.PauseTiming();
if (!FLAGS_save_output.empty()) {
- GLUTEN_THROW_NOT_OK(writer.closeWriter());
+ GLUTEN_THROW_NOT_OK(writer->closeWriter());
}
state.ResumeTiming();
}
@@ -488,7 +489,8 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state,
{
ScopedTimer timer(&elapsedTime);
for (auto _ : state) {
- auto resultIter = getInputIteratorFromFileReader(inputFile, readerType);
+ auto resultIter = FileReaderIterator::getInputIteratorFromFileReader(
+ readerType, inputFile, FLAGS_batch_size,
runtime->memoryManager()->getLeafMemoryPool().get());
runShuffle(
runtime,
listenerPtr,
@@ -591,19 +593,7 @@ int main(int argc, char** argv) {
std::vector<std::string> splitFiles{};
std::vector<std::string> dataFiles{};
- if (FLAGS_run_example) {
- LOG(WARNING) << "Running example...";
- dataFiles.resize(2);
- try {
- substraitJsonFile = getGeneratedFilePath("example.json");
- dataFiles[0] = getGeneratedFilePath("example_orders");
- dataFiles[1] = getGeneratedFilePath("example_lineitem");
- } catch (const std::exception& e) {
- LOG(ERROR) << "Failed to run example. " << e.what();
- ::benchmark::Shutdown();
- std::exit(EXIT_FAILURE);
- }
- } else if (FLAGS_run_shuffle) {
+ if (FLAGS_run_shuffle) {
std::string errorMsg{};
if (FLAGS_data.empty()) {
errorMsg = "Missing '--split' or '--data' option.";
diff --git a/cpp/velox/benchmarks/common/OrcReaderIterator.h
b/cpp/velox/benchmarks/common/OrcReaderIterator.h
deleted file mode 100644
index f8c9f44b20..0000000000
--- a/cpp/velox/benchmarks/common/OrcReaderIterator.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <arrow/adapters/orc/adapter.h>
-#include "benchmarks/common/FileReaderIterator.h"
-
-namespace gluten {
-
-class OrcReaderIterator : public FileReaderIterator {
- public:
- explicit OrcReaderIterator(const std::string& path) :
FileReaderIterator(path) {}
-
- void createReader() override {
- // Open File
- auto input = arrow::io::ReadableFile::Open(path_);
- GLUTEN_THROW_NOT_OK(input);
-
- // Open ORC File Reader
- auto maybeReader = arrow::adapters::orc::ORCFileReader::Open(*input,
arrow::default_memory_pool());
- GLUTEN_THROW_NOT_OK(maybeReader);
- fileReader_.reset((*maybeReader).release());
-
- // get record batch Reader
- auto recordBatchReader = fileReader_->GetRecordBatchReader(4096,
std::vector<std::string>());
- GLUTEN_THROW_NOT_OK(recordBatchReader);
- recordBatchReader_ = *recordBatchReader;
- }
-
- std::shared_ptr<arrow::Schema> getSchema() override {
- auto schema = fileReader_->ReadSchema();
- GLUTEN_THROW_NOT_OK(schema);
- return *schema;
- }
-
- protected:
- std::unique_ptr<arrow::adapters::orc::ORCFileReader> fileReader_;
- std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
-};
-
-class OrcStreamReaderIterator final : public OrcReaderIterator {
- public:
- explicit OrcStreamReaderIterator(const std::string& path) :
OrcReaderIterator(path) {
- createReader();
- }
-
- std::shared_ptr<gluten::ColumnarBatch> next() override {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
- DLOG(INFO) << "OrcStreamReaderIterator get a batch, num rows: " << (batch
? batch->num_rows() : 0);
- collectBatchTime_ +=
-
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
- startTime).count();
- if (batch == nullptr) {
- return nullptr;
- }
- return convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(batch));
- }
-};
-
-class OrcBufferedReaderIterator final : public OrcReaderIterator {
- public:
- explicit OrcBufferedReaderIterator(const std::string& path) :
OrcReaderIterator(path) {
- createReader();
- collectBatches();
- iter_ = batches_.begin();
- DLOG(INFO) << "OrcBufferedReaderIterator open file: " << path;
- DLOG(INFO) << "Number of input batches: " <<
std::to_string(batches_.size());
- if (iter_ != batches_.cend()) {
- DLOG(INFO) << "columns: " << (*iter_)->num_columns();
- DLOG(INFO) << "rows: " << (*iter_)->num_rows();
- }
- }
-
- std::shared_ptr<gluten::ColumnarBatch> next() override {
- if (iter_ == batches_.cend()) {
- return nullptr;
- }
- return
convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
- }
-
- private:
- void collectBatches() {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
- auto endTime = std::chrono::steady_clock::now();
- collectBatchTime_ +=
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime -
startTime).count();
- }
-
- arrow::RecordBatchVector batches_;
- std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
-};
-
-} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/benchmarks/common/ParquetReaderIterator.h
b/cpp/velox/benchmarks/common/ParquetReaderIterator.h
deleted file mode 100644
index 20652ee27d..0000000000
--- a/cpp/velox/benchmarks/common/ParquetReaderIterator.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "benchmarks/common/BenchmarkUtils.h"
-#include "benchmarks/common/FileReaderIterator.h"
-#include "utils/Macros.h"
-
-#include <parquet/arrow/reader.h>
-
-namespace gluten {
-
-class ParquetReaderIterator : public FileReaderIterator {
- public:
- explicit ParquetReaderIterator(const std::string& path) :
FileReaderIterator(path) {}
-
- void createReader() {
- parquet::ArrowReaderProperties properties =
parquet::default_arrow_reader_properties();
- properties.set_batch_size(FLAGS_batch_size);
- GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
- GLUTEN_THROW_NOT_OK(
-
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
&recordBatchReader_));
-
- auto schema = recordBatchReader_->schema();
- LOG(INFO) << "schema:\n" << schema->ToString();
- }
-
- std::shared_ptr<arrow::Schema> getSchema() override {
- return recordBatchReader_->schema();
- }
-
- protected:
- std::unique_ptr<parquet::arrow::FileReader> fileReader_;
- std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
-};
-
-class ParquetStreamReaderIterator final : public ParquetReaderIterator {
- public:
- explicit ParquetStreamReaderIterator(const std::string& path) :
ParquetReaderIterator(path) {
- createReader();
- DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
- }
-
- std::shared_ptr<gluten::ColumnarBatch> next() override {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
- DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " <<
(batch ? batch->num_rows() : 0);
- collectBatchTime_ +=
-
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
- startTime).count();
- if (batch == nullptr) {
- return nullptr;
- }
- return convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(batch));
- }
-};
-
-class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
- public:
- explicit ParquetBufferedReaderIterator(const std::string& path) :
ParquetReaderIterator(path) {
- createReader();
- collectBatches();
- iter_ = batches_.begin();
- DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
- DLOG(INFO) << "Number of input batches: " <<
std::to_string(batches_.size());
- if (iter_ != batches_.cend()) {
- DLOG(INFO) << "columns: " << (*iter_)->num_columns();
- DLOG(INFO) << "rows: " << (*iter_)->num_rows();
- }
- }
-
- std::shared_ptr<gluten::ColumnarBatch> next() override {
- if (iter_ == batches_.cend()) {
- return nullptr;
- }
- return
convertBatch(std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
- }
-
- private:
- void collectBatches() {
- auto startTime = std::chrono::steady_clock::now();
- GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
- auto endTime = std::chrono::steady_clock::now();
- collectBatchTime_ +=
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime -
startTime).count();
- }
-
- arrow::RecordBatchVector batches_;
- std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
-};
-
-} // namespace gluten
diff --git a/cpp/velox/benchmarks/exec/OrcConverter.cc
b/cpp/velox/benchmarks/exec/OrcConverter.cc
index b421ecca3b..888cf27c35 100644
--- a/cpp/velox/benchmarks/exec/OrcConverter.cc
+++ b/cpp/velox/benchmarks/exec/OrcConverter.cc
@@ -16,7 +16,7 @@
*/
#include <arrow/adapters/orc/adapter.h>
-#include "benchmarks/common/ParquetReaderIterator.h"
+#include "operators/reader/ParquetReaderIterator.h"
namespace gluten {
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 4c6b52e6fe..20c3dec939 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -28,16 +28,14 @@
#include "compute/VeloxPlanConverter.h"
#include "config/VeloxConfig.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
-#include "shuffle/VeloxHashShuffleWriter.h"
-#include "shuffle/VeloxRssSortShuffleWriter.h"
+#include "operators/writer/VeloxArrowWriter.h"
#include "shuffle/VeloxShuffleReader.h"
+#include "shuffle/VeloxShuffleWriter.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
#ifdef ENABLE_HDFS
-
#include "operators/writer/VeloxParquetDataSourceHDFS.h"
-
#endif
#ifdef ENABLE_S3
@@ -308,4 +306,12 @@ void VeloxRuntime::dumpConf(const std::string& path) {
outFile.close();
}
+std::shared_ptr<ArrowWriter> VeloxRuntime::createArrowWriter(const
std::string& path) {
+ int64_t batchSize = 4096;
+ if (auto it = confMap_.find(kSparkBatchSize); it != confMap_.end()) {
+ batchSize = std::atol(it->second.c_str());
+ }
+ return std::make_shared<VeloxArrowWriter>(path, batchSize,
memoryManager()->getLeafMemoryPool().get());
+}
+
} // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 846f740cb8..798fa5bc72 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -76,6 +76,8 @@ class VeloxRuntime final : public Runtime {
void dumpConf(const std::string& path) override;
+ std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path)
override;
+
std::shared_ptr<VeloxDataSource> createDataSource(const std::string&
filePath, std::shared_ptr<arrow::Schema> schema);
std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.cc
b/cpp/velox/operators/reader/FileReaderIterator.cc
similarity index 55%
rename from cpp/velox/benchmarks/common/FileReaderIterator.cc
rename to cpp/velox/operators/reader/FileReaderIterator.cc
index 26985c7f03..d732adbf33 100644
--- a/cpp/velox/benchmarks/common/FileReaderIterator.cc
+++ b/cpp/velox/operators/reader/FileReaderIterator.cc
@@ -15,33 +15,38 @@
* limitations under the License.
*/
-#include "FileReaderIterator.h"
-#include "benchmarks/common/ParquetReaderIterator.h"
-#ifdef GLUTEN_ENABLE_ORC
-#include "benchmarks/common/OrcReaderIterator.h"
-#endif
+#include "operators/reader/FileReaderIterator.h"
+#include <filesystem>
+#include "operators/reader/ParquetReaderIterator.h"
-std::shared_ptr<gluten::ResultIterator> gluten::getInputIteratorFromFileReader(
+namespace gluten {
+namespace {
+const std::string kParquetSuffix = ".parquet";
+}
+
+FileReaderIterator::FileReaderIterator(const std::string& path) : path_(path)
{}
+
+int64_t FileReaderIterator::getCollectBatchTime() const {
+ return collectBatchTime_;
+}
+
+std::shared_ptr<gluten::ResultIterator>
FileReaderIterator::getInputIteratorFromFileReader(
+ FileReaderType readerType,
const std::string& path,
- gluten::FileReaderType readerType) {
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool) {
std::filesystem::path input{path};
auto suffix = input.extension().string();
if (suffix == kParquetSuffix) {
if (readerType == FileReaderType::kStream) {
- return
std::make_shared<gluten::ResultIterator>(std::make_unique<ParquetStreamReaderIterator>(path));
- }
- if (readerType == FileReaderType::kBuffered) {
- return
std::make_shared<gluten::ResultIterator>(std::make_unique<ParquetBufferedReaderIterator>(path));
- }
- } else if (suffix == kOrcSuffix) {
-#ifdef GLUTEN_ENABLE_ORC
- if (readerType == FileReaderType::kStream) {
- return
std::make_shared<gluten::ResultIterator>(std::make_unique<OrcStreamReaderIterator>(path));
+ return std::make_shared<gluten::ResultIterator>(
+ std::make_unique<ParquetStreamReaderIterator>(path, batchSize,
pool));
}
if (readerType == FileReaderType::kBuffered) {
- return
std::make_shared<gluten::ResultIterator>(std::make_unique<OrcBufferedReaderIterator>(path));
+ return std::make_shared<gluten::ResultIterator>(
+ std::make_unique<ParquetBufferedReaderIterator>(path, batchSize,
pool));
}
-#endif
}
throw new GlutenException("Unreachable.");
}
+} // namespace gluten
diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.h
b/cpp/velox/operators/reader/FileReaderIterator.h
similarity index 69%
rename from cpp/velox/benchmarks/common/FileReaderIterator.h
rename to cpp/velox/operators/reader/FileReaderIterator.h
index 16db58ce45..e782c2bf80 100644
--- a/cpp/velox/benchmarks/common/FileReaderIterator.h
+++ b/cpp/velox/operators/reader/FileReaderIterator.h
@@ -14,43 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
-#include <arrow/io/api.h>
-#include <arrow/record_batch.h>
-#include <arrow/util/range.h>
+#pragma once
-#include "BenchmarkUtils.h"
#include "compute/ResultIterator.h"
-#include "memory/ColumnarBatch.h"
#include "memory/ColumnarBatchIterator.h"
+#include "velox/common/memory/MemoryPool.h"
namespace gluten {
-
-static const std::string kOrcSuffix = ".orc";
-static const std::string kParquetSuffix = ".parquet";
-
enum FileReaderType { kBuffered, kStream, kNone };
class FileReaderIterator : public ColumnarBatchIterator {
public:
- explicit FileReaderIterator(const std::string& path) : path_(path) {}
+ static std::shared_ptr<gluten::ResultIterator>
getInputIteratorFromFileReader(
+ FileReaderType readerType,
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool);
+
+ explicit FileReaderIterator(const std::string& path);
virtual ~FileReaderIterator() = default;
virtual std::shared_ptr<arrow::Schema> getSchema() = 0;
- int64_t getCollectBatchTime() const {
- return collectBatchTime_;
- }
+ int64_t getCollectBatchTime() const;
protected:
int64_t collectBatchTime_ = 0;
std::string path_;
};
-std::shared_ptr<gluten::ResultIterator> getInputIteratorFromFileReader(
- const std::string& path,
- FileReaderType readerType);
-
} // namespace gluten
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.cc
b/cpp/velox/operators/reader/ParquetReaderIterator.cc
new file mode 100644
index 0000000000..3e61e1d8d9
--- /dev/null
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.cc
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/reader/ParquetReaderIterator.h"
+#include "memory/VeloxColumnarBatch.h"
+
+#include <arrow/util/range.h>
+
+namespace gluten {
+
+ParquetReaderIterator::ParquetReaderIterator(
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool)
+ : FileReaderIterator(path), batchSize_(batchSize), pool_(pool) {}
+
+void ParquetReaderIterator::createReader() {
+ parquet::ArrowReaderProperties properties =
parquet::default_arrow_reader_properties();
+ properties.set_batch_size(batchSize_);
+ GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make(
+ arrow::default_memory_pool(),
parquet::ParquetFileReader::OpenFile(path_), properties, &fileReader_));
+ GLUTEN_THROW_NOT_OK(
+
fileReader_->GetRecordBatchReader(arrow::internal::Iota(fileReader_->num_row_groups()),
&recordBatchReader_));
+
+ auto schema = recordBatchReader_->schema();
+ DLOG(INFO) << "Schema:\n" << schema->ToString();
+}
+
+std::shared_ptr<arrow::Schema> ParquetReaderIterator::getSchema() {
+ return recordBatchReader_->schema();
+}
+
+ParquetStreamReaderIterator::ParquetStreamReaderIterator(
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool)
+ : ParquetReaderIterator(path, batchSize, pool) {
+ createReader();
+ DLOG(INFO) << "ParquetStreamReaderIterator open file: " << path;
+}
+
+std::shared_ptr<gluten::ColumnarBatch> ParquetStreamReaderIterator::next() {
+ auto startTime = std::chrono::steady_clock::now();
+ GLUTEN_ASSIGN_OR_THROW(auto batch, recordBatchReader_->Next());
+ DLOG(INFO) << "ParquetStreamReaderIterator get a batch, num rows: " <<
(batch ? batch->num_rows() : 0);
+ collectBatchTime_ +=
+
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now()
- startTime).count();
+ if (batch == nullptr) {
+ return nullptr;
+ }
+ return VeloxColumnarBatch::from(pool_,
std::make_shared<gluten::ArrowColumnarBatch>(batch));
+}
+
+ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool)
+ : ParquetReaderIterator(path, batchSize, pool) {
+ createReader();
+ collectBatches();
+ iter_ = batches_.begin();
+ DLOG(INFO) << "ParquetBufferedReaderIterator open file: " << path;
+ DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
+ if (iter_ != batches_.cend()) {
+ DLOG(INFO) << "columns: " << (*iter_)->num_columns();
+ DLOG(INFO) << "rows: " << (*iter_)->num_rows();
+ }
+}
+
+std::shared_ptr<gluten::ColumnarBatch> ParquetBufferedReaderIterator::next() {
+ if (iter_ == batches_.cend()) {
+ return nullptr;
+ }
+ return VeloxColumnarBatch::from(pool_,
std::make_shared<gluten::ArrowColumnarBatch>(*iter_++));
+}
+
+void ParquetBufferedReaderIterator::collectBatches() {
+ auto startTime = std::chrono::steady_clock::now();
+ GLUTEN_ASSIGN_OR_THROW(batches_, recordBatchReader_->ToRecordBatches());
+ auto endTime = std::chrono::steady_clock::now();
+ collectBatchTime_ +=
std::chrono::duration_cast<std::chrono::nanoseconds>(endTime -
startTime).count();
+}
+} // namespace gluten
\ No newline at end of file
diff --git a/cpp/velox/operators/reader/ParquetReaderIterator.h
b/cpp/velox/operators/reader/ParquetReaderIterator.h
new file mode 100644
index 0000000000..f45fe5eb77
--- /dev/null
+++ b/cpp/velox/operators/reader/ParquetReaderIterator.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "operators/reader/FileReaderIterator.h"
+
+#include <parquet/arrow/reader.h>
+#include <memory>
+
+namespace gluten {
+
+class ParquetReaderIterator : public FileReaderIterator {
+ public:
+ explicit ParquetReaderIterator(const std::string& path, int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
+
+ void createReader();
+
+ std::shared_ptr<arrow::Schema> getSchema() override;
+
+ protected:
+ std::unique_ptr<::parquet::arrow::FileReader> fileReader_;
+ std::shared_ptr<arrow::RecordBatchReader> recordBatchReader_;
+ int64_t batchSize_;
+ facebook::velox::memory::MemoryPool* pool_;
+};
+
+class ParquetStreamReaderIterator final : public ParquetReaderIterator {
+ public:
+ ParquetStreamReaderIterator(const std::string& path, int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
+
+ std::shared_ptr<gluten::ColumnarBatch> next() override;
+};
+
+class ParquetBufferedReaderIterator final : public ParquetReaderIterator {
+ public:
+ explicit ParquetBufferedReaderIterator(
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool);
+
+ std::shared_ptr<gluten::ColumnarBatch> next() override;
+
+ private:
+ void collectBatches();
+
+ arrow::RecordBatchVector batches_;
+ std::vector<std::shared_ptr<arrow::RecordBatch>>::const_iterator iter_;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index acb14cf4de..9c5d166a07 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -17,10 +17,11 @@
#include "VeloxColumnarBatchSerializer.h"
+#include <arrow/buffer.h>
+
#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
#include "velox/common/memory/Memory.h"
-#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/arrow/Bridge.h"
diff --git a/cpp/core/operators/writer/ArrowWriter.h
b/cpp/velox/operators/writer/VeloxArrowWriter.cc
similarity index 57%
copy from cpp/core/operators/writer/ArrowWriter.h
copy to cpp/velox/operators/writer/VeloxArrowWriter.cc
index 0d0b8ce2cb..565602d95c 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/velox/operators/writer/VeloxArrowWriter.cc
@@ -15,25 +15,24 @@
* limitations under the License.
*/
-#pragma once
+#include "operators/writer/VeloxArrowWriter.h"
-#include "parquet/arrow/writer.h"
+namespace gluten {
-/**
- * @brief Used to print RecordBatch to a parquet file
- *
- */
-class ArrowWriter {
- public:
- explicit ArrowWriter(std::string& path) : path_(path) {}
-
- arrow::Status initWriter(arrow::Schema& schema);
-
- arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);
-
- arrow::Status closeWriter();
+VeloxArrowWriter::VeloxArrowWriter(
+ const std::string& path,
+ int64_t batchSize,
+ facebook::velox::memory::MemoryPool* pool)
+ : ArrowWriter(path), batchSize_(batchSize), pool_(pool) {}
- private:
- std::unique_ptr<parquet::arrow::FileWriter> writer_;
- std::string path_;
-};
+std::shared_ptr<ColumnarBatch> VeloxArrowWriter::retrieveColumnarBatch() {
+ if (writer_ == nullptr) {
+ // No data to read.
+ return nullptr;
+ }
+ if (reader_ == nullptr) {
+ reader_ = std::make_unique<ParquetStreamReaderIterator>(path_, batchSize_,
pool_);
+ }
+ return reader_->next();
+}
+} // namespace gluten
diff --git a/cpp/core/operators/writer/ArrowWriter.h
b/cpp/velox/operators/writer/VeloxArrowWriter.h
similarity index 62%
copy from cpp/core/operators/writer/ArrowWriter.h
copy to cpp/velox/operators/writer/VeloxArrowWriter.h
index 0d0b8ce2cb..8b79986287 100644
--- a/cpp/core/operators/writer/ArrowWriter.h
+++ b/cpp/velox/operators/writer/VeloxArrowWriter.h
@@ -17,23 +17,19 @@
#pragma once
-#include "parquet/arrow/writer.h"
+#include "operators/reader/ParquetReaderIterator.h"
+#include "operators/writer/ArrowWriter.h"
-/**
- * @brief Used to print RecordBatch to a parquet file
- *
- */
-class ArrowWriter {
+namespace gluten {
+class VeloxArrowWriter : public ArrowWriter {
public:
- explicit ArrowWriter(std::string& path) : path_(path) {}
-
- arrow::Status initWriter(arrow::Schema& schema);
-
- arrow::Status writeInBatches(std::shared_ptr<arrow::RecordBatch> batch);
+ explicit VeloxArrowWriter(const std::string& path, int64_t batchSize,
facebook::velox::memory::MemoryPool* pool);
- arrow::Status closeWriter();
+ std::shared_ptr<ColumnarBatch> retrieveColumnarBatch() override;
private:
- std::unique_ptr<parquet::arrow::FileWriter> writer_;
- std::string path_;
+ int64_t batchSize_;
+ facebook::velox::memory::MemoryPool* pool_;
+ std::unique_ptr<ParquetStreamReaderIterator> reader_{nullptr};
};
+} // namespace gluten
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index 4b5bcdf819..b4944d9205 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -103,6 +103,10 @@ class DummyRuntime final : public Runtime {
throw GlutenException("Not yet implemented");
}
+ std::shared_ptr<ArrowWriter> createArrowWriter(const std::string& path)
override {
+ throw GlutenException("Not yet implemented");
+ }
+
private:
class DummyResultIterator : public ColumnarBatchIterator {
public:
diff --git a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
index ffa6f032ac..c00ab6a148 100644
--- a/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
+++ b/cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
@@ -19,12 +19,13 @@
#include "memory/ArrowMemoryPool.h"
#include "memory/VeloxColumnarBatch.h"
-#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/vector/arrow/Bridge.h"
#include "velox/vector/tests/utils/VectorTestBase.h"
+#include <arrow/buffer.h>
+
using namespace facebook::velox;
namespace gluten {
diff --git a/cpp/velox/utils/VeloxArrowUtils.cc
b/cpp/velox/utils/VeloxArrowUtils.cc
index 0349eb718b..f26b49a476 100644
--- a/cpp/velox/utils/VeloxArrowUtils.cc
+++ b/cpp/velox/utils/VeloxArrowUtils.cc
@@ -16,6 +16,9 @@
*/
#include "utils/VeloxArrowUtils.h"
+
+#include <arrow/buffer.h>
+
#include "memory/VeloxColumnarBatch.h"
#include "utils/Common.h"
#include "velox/vector/ComplexVector.h"
diff --git a/docs/developers/MicroBenchmarks.md
b/docs/developers/MicroBenchmarks.md
index eedf5010b6..1483dc2cba 100644
--- a/docs/developers/MicroBenchmarks.md
+++ b/docs/developers/MicroBenchmarks.md
@@ -64,7 +64,7 @@ cd /path/to/gluten/cpp/build/velox/benchmarks
--plan
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example.json
\
--data
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_orders/part-00000-1e66fb98-4dd6-47a6-8679-8625dbc437ee-c000.snappy.parquet,\
/home/sparkuser/github/apache/incubator-gluten/backends-velox/generated-native-benchmark/example_lineitem/part-00000-3ec19189-d20e-4240-85ae-88631d46b612-c000.snappy.parquet
\
---threads 1 --iterations 1 --noprint-result
--benchmark_filter=InputFromBatchStream
+--threads 1 --iterations 1 --noprint-result
```
The output should be like:
@@ -118,12 +118,12 @@ cd /path/to/gluten/
First, get the Stage Id from spark UI for the stage you want to simulate.
And then re-run the query with below configurations to dump the inputs to
micro benchmark.
-| Parameters | Description
| Recommend Setting |
-|---------------------------------------------|----------------------------------------------------------------------------------------------------------------|-----------------------|
-| spark.gluten.sql.benchmark_task.stageId | Spark task stage id
| target stage id |
-| spark.gluten.sql.benchmark_task.partitionId | Spark task partition id,
default value -1 means all the partition of this stage
| 0 |
-| spark.gluten.sql.benchmark_task.taskId | If not specify partition id,
use spark task attempt id, default value -1 means all the partition of this
stage | target task attemp id |
-| spark.gluten.saveDir | Directory to save the inputs
to micro benchmark, should exist and be empty.
| /path/to/saveDir |
+| Parameters | Description
| Recommend
Setting |
+|---------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------|
+| spark.gluten.sql.benchmark_task.taskId | Comma-separated string to
specify the Task IDs to dump. If it's set,
`spark.gluten.sql.benchmark_task.stageId` and
`spark.gluten.sql.benchmark_task.partitionId` will be ignored.
| Comma-separated
string of task IDs. Empty by default. |
+| spark.gluten.sql.benchmark_task.stageId | Spark stage ID.
| Target stage ID
|
+| spark.gluten.sql.benchmark_task.partitionId | Comma-separated string to
specify the Partition IDs in a stage to dump. Must be specified together with
`spark.gluten.sql.benchmark_task.stageId`. Empty by default, meaning all
partitions of this stage will be dumped. To identify the partition ID, navigate
to the `Stage` tab in the Spark UI and locate it under the `Index` column. |
Comma-separated string of partition IDs. Empty by default. |
+| spark.gluten.saveDir | Directory to save the inputs
to micro benchmark, should exist and be empty.
|
/path/to/saveDir |
Check the files in `spark.gluten.saveDir`. If the simulated stage is a first
stage, you will get 3
or 4 types of dumped file:
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
index dea0a48140..f9bb7478e7 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/utils/DebugUtil.scala
@@ -21,24 +21,32 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.TaskContext
object DebugUtil {
- // if specify taskId, then only do that task partition
- // if not specify stageId, then do nothing
+ // if taskId is specified and matches, then do that task
+ // if stageId is not specified or doesn't match, then do nothing
// if specify stageId but no partitionId, then do all partitions for that
stage
// if specify stageId and partitionId, then only do that partition for that
stage
def saveInputToFile(): Boolean = {
- if (TaskContext.get().taskAttemptId() == GlutenConfig.getConf.taskId) {
- return true
- }
- if (TaskContext.get().stageId() != GlutenConfig.getConf.taskStageId) {
- return false
- }
- if (GlutenConfig.getConf.taskPartitionId == -1) {
- return true
- }
- if (TaskContext.getPartitionId() == GlutenConfig.getConf.taskPartitionId) {
- return true
- }
+ def taskIdMatches =
+ GlutenConfig.getConf.benchmarkTaskId.nonEmpty &&
+ GlutenConfig.getConf.benchmarkTaskId
+ .split(",")
+ .map(_.toLong)
+ .contains(TaskContext.get().taskAttemptId())
+
+ def partitionIdMatches =
+ TaskContext.get().stageId() == GlutenConfig.getConf.benchmarkStageId &&
+ (GlutenConfig.getConf.benchmarkPartitionId.isEmpty ||
+ GlutenConfig.getConf.benchmarkPartitionId
+ .split(",")
+ .map(_.toInt)
+ .contains(TaskContext.get().partitionId()))
- false
+ val saveInput = taskIdMatches || partitionIdMatches
+ if (saveInput) {
+ if (GlutenConfig.getConf.benchmarkSaveDir.isEmpty) {
+ throw new IllegalArgumentException(GlutenConfig.BENCHMARK_SAVE_DIR.key
+ " is not set.")
+ }
+ }
+ saveInput
}
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index f756eb20a6..e0d06ce6fc 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -423,9 +423,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def debug: Boolean = conf.getConf(DEBUG_ENABLED)
def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
- def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
- def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID)
- def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID)
+ def benchmarkStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
+ def benchmarkPartitionId: String = conf.getConf(BENCHMARK_TASK_PARTITIONID)
+ def benchmarkTaskId: String = conf.getConf(BENCHMARK_TASK_TASK_ID)
+ def benchmarkSaveDir: String = conf.getConf(BENCHMARK_SAVE_DIR)
def textInputMaxBlockSize: Long = conf.getConf(TEXT_INPUT_ROW_MAX_BLOCK_SIZE)
def textIputEmptyAsDefault: Boolean =
conf.getConf(TEXT_INPUT_EMPTY_AS_DEFAULT)
def enableParquetRowGroupMaxMinIndex: Boolean =
@@ -1719,14 +1720,20 @@ object GlutenConfig {
val BENCHMARK_TASK_PARTITIONID =
buildConf("spark.gluten.sql.benchmark_task.partitionId")
.internal()
- .intConf
- .createWithDefault(-1)
+ .stringConf
+ .createWithDefault("")
val BENCHMARK_TASK_TASK_ID =
buildConf("spark.gluten.sql.benchmark_task.taskId")
.internal()
- .longConf
- .createWithDefault(-1L)
+ .stringConf
+ .createWithDefault("")
+
+ val BENCHMARK_SAVE_DIR =
+ buildConf(GLUTEN_SAVE_DIR)
+ .internal()
+ .stringConf
+ .createWithDefault("")
val NATIVE_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.writer.enabled")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]