This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c1a3f7b502 [GLUTEN-7028][CH][Part-8] Support one pipeline write for 
partition mergetree (#7924)
c1a3f7b502 is described below

commit c1a3f7b502ceafd7b5ca11117a93a295ea5d69ec
Author: Chang chen <[email protected]>
AuthorDate: Wed Nov 13 09:32:32 2024 +0800

    [GLUTEN-7028][CH][Part-8] Support one pipeline write for partition 
mergetree (#7924)
    
    * [Refactor] simple refactor
    * [Refactor] Remove setStats
    * [Refactor] SparkPartitionedBaseSink and WriteStatsBase
    * [Refactor] Add explicit SparkMergeTreeWriteSettings(const DB::ContextPtr 
& context);
    * [New] Support writing partition mergetree in one pipeline
---
 .../GlutenClickHouseMergeTreeWriteSuite.scala      | 180 +++++-----
 .../Parser/RelParsers/WriteRelParser.cpp           |  39 +--
 .../local-engine/Parser/SerializedPlanParser.cpp   |   8 +-
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |   4 +-
 .../Storages/MergeTree/SparkMergeTreeSink.cpp      |  19 +-
 .../Storages/MergeTree/SparkMergeTreeSink.h        | 160 +++++----
 .../MergeTree/SparkMergeTreeWriteSettings.cpp      |   3 +-
 .../MergeTree/SparkMergeTreeWriteSettings.h        |   2 +-
 .../Storages/MergeTree/SparkStorageMergeTree.cpp   |   8 +-
 .../Storages/Output/NormalFileWriter.cpp           |   2 +-
 .../Storages/Output/NormalFileWriter.h             | 122 ++++---
 .../tests/gtest_write_pipeline_mergetree.cpp       |  42 ++-
 .../tests/json/mergetree/4_one_pipeline.json       | 377 +++++++++++++++++++++
 ...line_input.json => lineitem_parquet_input.json} |   0
 14 files changed, 701 insertions(+), 265 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index b33a2065f6..85c8c2d92a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -162,103 +162,107 @@ class GlutenClickHouseMergeTreeWriteSuite
   }
 
   test("test mergetree insert overwrite") {
-    spark.sql(s"""
-                 |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
-                 |""".stripMargin)
+    withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, 
spark35.toString)) {
+      spark.sql(s"""
+                   |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
+                   |""".stripMargin)
 
-    spark.sql(s"""
-                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite
-                 |(
-                 | l_orderkey      bigint,
-                 | l_partkey       bigint,
-                 | l_suppkey       bigint,
-                 | l_linenumber    bigint,
-                 | l_quantity      double,
-                 | l_extendedprice double,
-                 | l_discount      double,
-                 | l_tax           double,
-                 | l_returnflag    string,
-                 | l_linestatus    string,
-                 | l_shipdate      date,
-                 | l_commitdate    date,
-                 | l_receiptdate   date,
-                 | l_shipinstruct  string,
-                 | l_shipmode      string,
-                 | l_comment       string
-                 |)
-                 |USING clickhouse
-                 |LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
-                 |""".stripMargin)
+      spark.sql(s"""
+                   |CREATE TABLE IF NOT EXISTS 
lineitem_mergetree_insertoverwrite
+                   |(
+                   | l_orderkey      bigint,
+                   | l_partkey       bigint,
+                   | l_suppkey       bigint,
+                   | l_linenumber    bigint,
+                   | l_quantity      double,
+                   | l_extendedprice double,
+                   | l_discount      double,
+                   | l_tax           double,
+                   | l_returnflag    string,
+                   | l_linestatus    string,
+                   | l_shipdate      date,
+                   | l_commitdate    date,
+                   | l_receiptdate   date,
+                   | l_shipinstruct  string,
+                   | l_shipmode      string,
+                   | l_comment       string
+                   |)
+                   |USING clickhouse
+                   |LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
+                   |""".stripMargin)
 
-    spark.sql(s"""
-                 | insert into table lineitem_mergetree_insertoverwrite
-                 | select * from lineitem
-                 |""".stripMargin)
+      spark.sql(s"""
+                   | insert into table lineitem_mergetree_insertoverwrite
+                   | select * from lineitem
+                   |""".stripMargin)
 
-    spark.sql(s"""
-                 | insert overwrite table lineitem_mergetree_insertoverwrite
-                 | select * from lineitem where mod(l_orderkey,2) = 1
-                 |""".stripMargin)
-    val sql2 =
-      s"""
-         | select count(*) from lineitem_mergetree_insertoverwrite
-         |
-         |""".stripMargin
-    assertResult(300001)(
-      // total rows should remain unchanged
-      spark.sql(sql2).collect().apply(0).get(0)
-    )
+      spark.sql(s"""
+                   | insert overwrite table lineitem_mergetree_insertoverwrite
+                   | select * from lineitem where mod(l_orderkey,2) = 1
+                   |""".stripMargin)
+      val sql2 =
+        s"""
+           | select count(*) from lineitem_mergetree_insertoverwrite
+           |
+           |""".stripMargin
+      assertResult(300001)(
+        // total rows should remain unchanged
+        spark.sql(sql2).collect().apply(0).get(0)
+      )
+    }
   }
 
   test("test mergetree insert overwrite partitioned table with small table, 
static") {
-    spark.sql(s"""
-                 |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
-                 |""".stripMargin)
+    withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, "false")) {
+      spark.sql(s"""
+                   |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
+                   |""".stripMargin)
 
-    spark.sql(s"""
-                 |CREATE TABLE IF NOT EXISTS 
lineitem_mergetree_insertoverwrite2
-                 |(
-                 | l_orderkey      bigint,
-                 | l_partkey       bigint,
-                 | l_suppkey       bigint,
-                 | l_linenumber    bigint,
-                 | l_quantity      double,
-                 | l_extendedprice double,
-                 | l_discount      double,
-                 | l_tax           double,
-                 | l_returnflag    string,
-                 | l_linestatus    string,
-                 | l_shipdate      date,
-                 | l_commitdate    date,
-                 | l_receiptdate   date,
-                 | l_shipinstruct  string,
-                 | l_shipmode      string,
-                 | l_comment       string
-                 |)
-                 |USING clickhouse
-                 |PARTITIONED BY (l_shipdate)
-                 |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
-                 |""".stripMargin)
+      spark.sql(s"""
+                   |CREATE TABLE IF NOT EXISTS 
lineitem_mergetree_insertoverwrite2
+                   |(
+                   | l_orderkey      bigint,
+                   | l_partkey       bigint,
+                   | l_suppkey       bigint,
+                   | l_linenumber    bigint,
+                   | l_quantity      double,
+                   | l_extendedprice double,
+                   | l_discount      double,
+                   | l_tax           double,
+                   | l_returnflag    string,
+                   | l_linestatus    string,
+                   | l_shipdate      date,
+                   | l_commitdate    date,
+                   | l_receiptdate   date,
+                   | l_shipinstruct  string,
+                   | l_shipmode      string,
+                   | l_comment       string
+                   |)
+                   |USING clickhouse
+                   |PARTITIONED BY (l_shipdate)
+                   |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
+                   |""".stripMargin)
 
-    spark.sql(s"""
-                 | insert into table lineitem_mergetree_insertoverwrite2
-                 | select * from lineitem
-                 |""".stripMargin)
+      spark.sql(s"""
+                   | insert into table lineitem_mergetree_insertoverwrite2
+                   | select * from lineitem
+                   |""".stripMargin)
 
-    spark.sql(
-      s"""
-         | insert overwrite table lineitem_mergetree_insertoverwrite2
-         | select * from lineitem where l_shipdate BETWEEN date'1993-02-01' 
AND date'1993-02-10'
-         |""".stripMargin)
-    val sql2 =
-      s"""
-         | select count(*) from lineitem_mergetree_insertoverwrite2
-         |
-         |""".stripMargin
-    assertResult(2418)(
-      // total rows should remain unchanged
-      spark.sql(sql2).collect().apply(0).get(0)
-    )
+      spark.sql(
+        s"""
+           | insert overwrite table lineitem_mergetree_insertoverwrite2
+           | select * from lineitem where l_shipdate BETWEEN date'1993-02-01' 
AND date'1993-02-10'
+           |""".stripMargin)
+      val sql2 =
+        s"""
+           | select count(*) from lineitem_mergetree_insertoverwrite2
+           |
+           |""".stripMargin
+      assertResult(2418)(
+        // total rows should remain unchanged
+        spark.sql(sql2).collect().apply(0).get(0)
+      )
+    }
   }
 
   test("test mergetree insert overwrite partitioned table with small table, 
dynamic") {
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index ea1239a809..25f786a774 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -58,15 +58,11 @@ DB::ProcessorPtr make_sink(
 {
     if (partition_by.empty())
     {
-        auto file_sink = std::make_shared<SubstraitFileSink>(context, 
base_path, "", filename, format_hint, input_header);
-        file_sink->setStats(stats);
-        return file_sink;
+        return std::make_shared<SubstraitFileSink>(context, base_path, "", 
filename, format_hint, input_header, stats);
     }
 
-    auto file_sink = std::make_shared<SubstraitPartitionedFileSink>(
-        context, partition_by, input_header, output_header, base_path, 
filename, format_hint);
-    file_sink->setStats(stats);
-    return file_sink;
+    return std::make_shared<SubstraitPartitionedFileSink>(
+        context, partition_by, input_header, output_header, base_path, 
filename, format_hint, stats);
 }
 
 DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const 
DB::Block & output)
@@ -148,25 +144,30 @@ void addMergeTreeSinkTransform(
     const DB::ContextPtr & context,
     const DB::QueryPipelineBuilderPtr & builder,
     const MergeTreeTable & merge_tree_table,
-    const DB::Block & output,
-    const DB::Names & /*partitionCols*/)
+    const DB::Block & header,
+    const DB::Names & partition_by)
 {
-    const DB::Settings & settings = context->getSettingsRef();
-    const auto dest_storage = 
merge_tree_table.getStorage(context->getGlobalContext());
-    StorageMetadataPtr metadata_snapshot = 
dest_storage->getInMemoryMetadataPtr();
-    ASTPtr none;
-    auto sink = dest_storage->write(none, metadata_snapshot, context, false);
+
     Chain chain;
-    chain.addSink(sink);
-    const SinkHelper & sink_helper = assert_cast<const SparkMergeTreeSink 
&>(*sink).sinkHelper();
     //
-    auto stats = std::make_shared<MergeTreeStats>(output, sink_helper);
+    auto stats = std::make_shared<MergeTreeStats>(header);
     chain.addSink(stats);
     //
+
+    SparkMergeTreeWriteSettings write_settings{context};
+    if (partition_by.empty())
+        write_settings.partition_settings.partition_dir = 
SubstraitFileSink::NO_PARTITION_ID;
+
+    auto sink = partition_by.empty() ?
+        SparkMergeTreeSink::create(merge_tree_table, write_settings, 
context->getGlobalContext(), {stats}) :
+        std::make_shared<SparkMergeTreePartitionedFileSink>(header, 
partition_by, merge_tree_table, write_settings, context, stats);
+
+    chain.addSource(sink);
+    const DB::Settings & settings = context->getSettingsRef();
     chain.addSource(std::make_shared<ApplySquashingTransform>(
-        output, settings[Setting::min_insert_block_size_rows], 
settings[Setting::min_insert_block_size_bytes]));
+        header, settings[Setting::min_insert_block_size_rows], 
settings[Setting::min_insert_block_size_bytes]));
     chain.addSource(std::make_shared<PlanSquashingTransform>(
-        output, settings[Setting::min_insert_block_size_rows], 
settings[Setting::min_insert_block_size_bytes]));
+        header, settings[Setting::min_insert_block_size_rows], 
settings[Setting::min_insert_block_size_bytes]));
 
     builder->addChain(std::move(chain));
 }
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 4e49c577bf..748ff88acb 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -283,7 +283,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const 
substrait::Rel & rel, std::list
     return query_plan;
 }
 
-DB::QueryPipelineBuilderPtr 
SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan)
+DB::QueryPipelineBuilderPtr 
SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan) const
 {
     const Settings & settings = 
parser_context->queryContext()->getSettingsRef();
     QueryPriorities priorities;
@@ -355,11 +355,7 @@ NonNullableColumnsResolver::NonNullableColumnsResolver(
     expression_parser = std::make_unique<ExpressionParser>(parser_context);
 }
 
-NonNullableColumnsResolver::~NonNullableColumnsResolver()
-{
-}
-
-// make it simple at present, if the condition contains or, return empty for 
both side.
+// make it simple at present if the condition contains or, return empty for 
both side.
 std::set<String> NonNullableColumnsResolver::resolve()
 {
     collected_columns.clear();
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 2bcc09a8ed..f0ec608a33 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -48,7 +48,7 @@ class NonNullableColumnsResolver
 public:
     explicit NonNullableColumnsResolver(
         const DB::Block & header_, std::shared_ptr<const ParserContext> 
parser_context_, const substrait::Expression & cond_rel_);
-    ~NonNullableColumnsResolver();
+    ~NonNullableColumnsResolver() = default;
 
     // return column names
     std::set<String> resolve();
@@ -76,7 +76,7 @@ public:
     /// visible for UT
     DB::QueryPlanPtr parse(const substrait::Plan & plan);
     std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & 
plan);
-    DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan);
+    DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan) 
const;
     ///
     std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
 
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
index 43a3a78295..de0c244f3e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
@@ -62,15 +62,24 @@ void SparkMergeTreeSink::onStart()
 void SparkMergeTreeSink::onFinish()
 {
     sink_helper->finish(context);
+    if (stats_.has_value())
+        (*stats_)->collectStats(sink_helper->unsafeGet(), 
sink_helper->write_settings.partition_settings.partition_dir);
 }
 
 /////
-SinkHelperPtr SparkMergeTreeSink::create(
-    const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings 
& write_settings_, const DB::ContextMutablePtr & context)
+SinkToStoragePtr SparkMergeTreeSink::create(
+    const MergeTreeTable & merge_tree_table,
+    const SparkMergeTreeWriteSettings & write_settings_,
+    const DB::ContextMutablePtr & context,
+    const SinkStatsOption & stats)
 {
+    if (write_settings_.partition_settings.part_name_prefix.empty())
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty 
part_name_prefix is not allowed.");
+
     auto dest_storage = merge_tree_table.getStorage(context);
     bool isRemoteStorage = 
dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
     bool insert_with_local_storage = 
!write_settings_.insert_without_local_storage;
+    SinkHelperPtr sink_helper;
     if (insert_with_local_storage && isRemoteStorage)
     {
         auto temp = merge_tree_table.copyToDefaultPolicyStorage(context);
@@ -78,10 +87,10 @@ SinkHelperPtr SparkMergeTreeSink::create(
             &Poco::Logger::get("SparkMergeTreeWriter"),
             "Create temp table {} for local merge.",
             temp->getStorageID().getFullNameNotQuoted());
-        return std::make_shared<CopyToRemoteSinkHelper>(temp, dest_storage, 
write_settings_);
+        sink_helper = std::make_shared<CopyToRemoteSinkHelper>(temp, 
dest_storage, write_settings_);
     }
-
-    return std::make_shared<DirectSinkHelper>(dest_storage, write_settings_, 
isRemoteStorage);
+    sink_helper = std::make_shared<DirectSinkHelper>(dest_storage, 
write_settings_, isRemoteStorage);
+    return std::make_shared<SparkMergeTreeSink>(sink_helper, context, stats);
 }
 
 SinkHelper::SinkHelper(const SparkStorageMergeTreePtr & data_, const 
SparkMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_)
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h 
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
index d19e5dc4e9..065b147462 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
@@ -21,6 +21,7 @@
 #include <Storages/MergeTree/SparkMergeTreeMeta.h>
 #include <Storages/MergeTree/SparkMergeTreeWriteSettings.h>
 #include <Storages/MergeTree/SparkStorageMergeTree.h>
+#include <Storages/Output/NormalFileWriter.h>
 #include <Common/BlockTypeUtils.h>
 
 namespace local_engine
@@ -152,39 +153,18 @@ public:
     }
 };
 
-class SparkMergeTreeSink : public DB::SinkToStorage
+class MergeTreeStats : public WriteStatsBase
 {
-public:
-    static SinkHelperPtr create(
-        const MergeTreeTable & merge_tree_table,
-        const SparkMergeTreeWriteSettings & write_settings_,
-        const DB::ContextMutablePtr & context);
+    DB::MutableColumns columns_;
 
-    explicit SparkMergeTreeSink(const SinkHelperPtr & sink_helper_, const 
ContextPtr & context_)
-        : SinkToStorage(sink_helper_->metadata_snapshot->getSampleBlock()), 
context(context_), sink_helper(sink_helper_)
+    enum ColumnIndex
     {
-    }
-    ~SparkMergeTreeSink() override = default;
-
-    String getName() const override { return "SparkMergeTreeSink"; }
-    void consume(Chunk & chunk) override;
-    void onStart() override;
-    void onFinish() override;
-
-    const SinkHelper & sinkHelper() const { return *sink_helper; }
-
-private:
-    ContextPtr context;
-    SinkHelperPtr sink_helper;
-
-    int part_num = 1;
-};
-
-
-class MergeTreeStats : public DB::ISimpleTransform
-{
-    bool all_chunks_processed_ = false; /// flag to determine if we have 
already processed all chunks
-    const SinkHelper & sink_helper;
+        part_name,
+        partition_id,
+        record_count,
+        marks_count,
+        size_in_bytes
+    };
 
     static DB::Block statsHeader()
     {
@@ -196,69 +176,109 @@ class MergeTreeStats : public DB::ISimpleTransform
              {BIGINT(), "size_in_bytes"}});
     }
 
-    DB::Chunk final_result() const
+    DB::Chunk final_result() override
+    {
+        size_t rows = columns_[part_name]->size();
+        return DB::Chunk(std::move(columns_), rows);
+    }
+
+public:
+    explicit MergeTreeStats(const DB::Block & input_header_)
+        : WriteStatsBase(input_header_, statsHeader()), 
columns_(statsHeader().cloneEmptyColumns())
     {
-        // TODO: remove it
-        const std::string NO_PARTITION_ID{"__NO_PARTITION_ID__"};
+    }
 
-        auto parts = sink_helper.unsafeGet();
+    String getName() const override { return "MergeTreeStats"; }
 
-        const size_t size = parts.size();
-        auto file_col = STRING()->createColumn();
-        file_col->reserve(size);
+    void collectStats(const std::deque<DB::MergeTreeDataPartPtr> & parts, 
const std::string & partition) const
+    {
+        const size_t size = parts.size() + columns_[part_name]->size();
 
-        auto partition_col = STRING()->createColumn();
-        partition_col->reserve(size);
+        columns_[part_name]->reserve(size);
+        columns_[partition_id]->reserve(size);
 
-        auto countCol = BIGINT()->createColumn();
-        countCol->reserve(size);
-        auto & countColData = static_cast<DB::ColumnVector<Int64> 
&>(*countCol).getData();
+        columns_[record_count]->reserve(size);
+        auto & countColData = static_cast<DB::ColumnVector<Int64> 
&>(*columns_[record_count]).getData();
 
-        auto marksCol = BIGINT()->createColumn();
-        marksCol->reserve(size);
-        auto & marksColData = static_cast<DB::ColumnVector<Int64> 
&>(*marksCol).getData();
+        columns_[marks_count]->reserve(size);
+        auto & marksColData = static_cast<DB::ColumnVector<Int64> 
&>(*columns_[marks_count]).getData();
 
-        auto bytesCol = BIGINT()->createColumn();
-        bytesCol->reserve(size);
-        auto & bytesColData = static_cast<DB::ColumnVector<Int64> 
&>(*bytesCol).getData();
+        columns_[size_in_bytes]->reserve(size);
+        auto & bytesColData = static_cast<DB::ColumnVector<Int64> 
&>(*columns_[size_in_bytes]).getData();
 
         for (const auto & part : parts)
         {
-            file_col->insertData(part->name.c_str(), part->name.size());
-            partition_col->insertData(NO_PARTITION_ID.c_str(), 
NO_PARTITION_ID.size());
+            columns_[part_name]->insertData(part->name.c_str(), 
part->name.size());
+            columns_[partition_id]->insertData(partition.c_str(), 
partition.size());
+
             countColData.emplace_back(part->rows_count);
             marksColData.emplace_back(part->getMarksCount());
             bytesColData.emplace_back(part->getBytesOnDisk());
         }
-        const DB::Columns res_columns{
-            std::move(file_col), std::move(partition_col), 
std::move(countCol), std::move(marksCol), std::move(bytesCol)};
-        return DB::Chunk(res_columns, size);
     }
+};
 
+class SparkMergeTreeSink : public DB::SinkToStorage
+{
 public:
-    explicit MergeTreeStats(const DB::Block & input_header_, const SinkHelper 
& sink_helper_)
-        : ISimpleTransform(input_header_, statsHeader(), true), 
sink_helper(sink_helper_)
+    using SinkStatsOption = std::optional<std::shared_ptr<MergeTreeStats>>;
+    static SinkToStoragePtr create(
+        const MergeTreeTable & merge_tree_table,
+        const SparkMergeTreeWriteSettings & write_settings_,
+        const DB::ContextMutablePtr & context,
+        const SinkStatsOption & stats = {});
+
+    explicit SparkMergeTreeSink(const SinkHelperPtr & sink_helper_, const 
ContextPtr & context_, const SinkStatsOption & stats)
+        : SinkToStorage(sink_helper_->metadata_snapshot->getSampleBlock()), 
context(context_), sink_helper(sink_helper_), stats_(stats)
     {
     }
-    Status prepare() override
-    {
-        if (input.isFinished() && !output.isFinished() && !has_input && 
!all_chunks_processed_)
-        {
-            all_chunks_processed_ = true;
-            /// return Ready to call transform() for generating filling rows 
after latest chunk was processed
-            return Status::Ready;
-        }
+    ~SparkMergeTreeSink() override = default;
+
+    String getName() const override { return "SparkMergeTreeSink"; }
+    void consume(Chunk & chunk) override;
+    void onStart() override;
+    void onFinish() override;
+
+    const SinkHelper & sinkHelper() const { return *sink_helper; }
 
-        return ISimpleTransform::prepare();
+private:
+    ContextPtr context;
+    SinkHelperPtr sink_helper;
+    std::optional<std::shared_ptr<MergeTreeStats>> stats_;
+    int part_num = 1;
+};
+
+
+class SparkMergeTreePartitionedFileSink final : public SparkPartitionedBaseSink
+{
+    const SparkMergeTreeWriteSettings write_settings_;
+    MergeTreeTable table;
+
+public:
+    SparkMergeTreePartitionedFileSink(
+        const DB::Block & input_header,
+        const DB::Names & partition_by,
+        const MergeTreeTable & merge_tree_table,
+        const SparkMergeTreeWriteSettings & write_settings,
+        const DB::ContextPtr & context,
+        const std::shared_ptr<WriteStatsBase> & stats)
+        : SparkPartitionedBaseSink(context, partition_by, input_header, 
stats), write_settings_(write_settings), table(merge_tree_table)
+    {
     }
 
-    String getName() const override { return "MergeTreeStats"; }
-    void transform(DB::Chunk & chunk) override
+    SinkPtr createSinkForPartition(const String & partition_id) override
     {
-        if (all_chunks_processed_)
-            chunk = final_result();
-        else
-            chunk = {};
+        SparkMergeTreeWriteSettings write_settings{write_settings_};
+
+        assert(write_settings.partition_settings.partition_dir.empty());
+        assert(write_settings.partition_settings.bucket_dir.empty());
+        write_settings.partition_settings.part_name_prefix
+            = fmt::format("{}/{}", partition_id, 
write_settings.partition_settings.part_name_prefix);
+        write_settings.partition_settings.partition_dir = partition_id;
+
+        return SparkMergeTreeSink::create(
+            table, write_settings, context_->getGlobalContext(), 
{std::dynamic_pointer_cast<MergeTreeStats>(stats_)});
     }
 };
+
 }
diff --git 
a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
index 75472d410c..e584b003d2 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
@@ -26,7 +26,8 @@ namespace local_engine
 
 IMPLEMENT_GLUTEN_SETTINGS(SparkMergeTreeWritePartitionSettings, 
MERGE_TREE_WRITE_RELATED_SETTINGS)
 
-void SparkMergeTreeWriteSettings::load(const DB::ContextPtr & context)
+SparkMergeTreeWriteSettings::SparkMergeTreeWriteSettings(const DB::ContextPtr 
& context)
+    : partition_settings(SparkMergeTreeWritePartitionSettings::get(context))
 {
     const DB::Settings & settings = context->getSettingsRef();
     merge_after_insert = 
settings.get(MERGETREE_MERGE_AFTER_INSERT).safeGet<bool>();
diff --git 
a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h 
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
index 1fbbdbe346..e89b2aaf5e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
@@ -35,6 +35,6 @@ struct SparkMergeTreeWriteSettings
     size_t merge_min_size = 1024 * 1024 * 1024;
     size_t merge_limit_parts = 10;
 
-    void load(const DB::ContextPtr & context);
+    explicit SparkMergeTreeWriteSettings(const DB::ContextPtr & context);
 };
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index a6a252be3d..0be7e0d892 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -513,17 +513,11 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeDataWriter::writeTempPart(
 SinkToStoragePtr SparkWriteStorageMergeTree::write(
     const ASTPtr &, const StorageMetadataPtr & /*storage_in_memory_metadata*/, 
ContextPtr context, bool /*async_insert*/)
 {
-    SparkMergeTreeWriteSettings 
settings{.partition_settings{SparkMergeTreeWritePartitionSettings::get(context)}};
-    if (settings.partition_settings.part_name_prefix.empty())
-        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty 
part_name_prefix is not allowed.");
-    settings.load(context);
-    SinkHelperPtr sink_helper = SparkMergeTreeSink::create(table, settings, 
getContext());
 #ifndef NDEBUG
     auto dest_storage = table.getStorage(getContext());
     assert(dest_storage.get() == this);
 #endif
-
-    return std::make_shared<SparkMergeTreeSink>(sink_helper, context);
+    return SparkMergeTreeSink::create(table, 
SparkMergeTreeWriteSettings{context}, getContext());
 }
 
 }
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index 8c904ab205..d572b85385 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -23,7 +23,7 @@ namespace local_engine
 {
 
 const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
-const std::string 
SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
+const std::string 
SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
 
 NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const 
DB::ContextPtr & context_) : file(file_), context(context_)
 {
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 8a25d7b21f..cd9c848dd4 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -61,20 +61,51 @@ private:
 OutputFormatFilePtr createOutputFormatFile(
     const DB::ContextPtr & context, const std::string & file_uri, const 
DB::Block & preferred_schema, const std::string & format_hint);
 
-class WriteStats : public DB::ISimpleTransform
+class WriteStatsBase : public DB::ISimpleTransform
 {
+protected:
     bool all_chunks_processed_ = false; /// flag to determine if we have 
already processed all chunks
-    DB::Arena partition_keys_arena_;
-    std::string filename_;
+    virtual DB::Chunk final_result() = 0;
 
-    absl::flat_hash_map<StringRef, size_t> file_to_count_;
+public:
+    WriteStatsBase(const DB::Block & input_header_, const DB::Block & 
output_header_)
+        : ISimpleTransform(input_header_, output_header_, true)
+    {
+    }
+
+    Status prepare() override
+    {
+        if (input.isFinished() && !output.isFinished() && !has_input && 
!all_chunks_processed_)
+        {
+            all_chunks_processed_ = true;
+            /// return Ready to call transform() for generating filling rows 
after latest chunk was processed
+            return Status::Ready;
+        }
 
+        return ISimpleTransform::prepare();
+    }
+
+    void transform(DB::Chunk & chunk) override
+    {
+        if (all_chunks_processed_)
+            chunk = final_result();
+        else
+            chunk = {};
+    }
+};
+
+class WriteStats : public WriteStatsBase
+{
     static DB::Block statsHeader()
     {
         return makeBlockHeader({{STRING(), "filename"}, {STRING(), 
"partition_id"}, {BIGINT(), "record_count"}});
     }
+    DB::Arena partition_keys_arena_;
+    std::string filename_;
+    absl::flat_hash_map<StringRef, size_t> file_to_count_;
 
-    DB::Chunk final_result() const
+protected:
+    DB::Chunk final_result() override
     {
         const size_t size = file_to_count_.size();
 
@@ -102,30 +133,9 @@ class WriteStats : public DB::ISimpleTransform
     }
 
 public:
-    explicit WriteStats(const DB::Block & input_header_) : 
ISimpleTransform(input_header_, statsHeader(), true) { }
-
-    Status prepare() override
-    {
-        if (input.isFinished() && !output.isFinished() && !has_input && 
!all_chunks_processed_)
-        {
-            all_chunks_processed_ = true;
-            /// return Ready to call transform() for generating filling rows 
after latest chunk was processed
-            return Status::Ready;
-        }
-
-        return ISimpleTransform::prepare();
-    }
-
+    explicit WriteStats(const DB::Block & input_header_) : 
WriteStatsBase(input_header_, statsHeader()) { }
     String getName() const override { return "WriteStats"; }
-    void transform(DB::Chunk & chunk) override
-    {
-        if (all_chunks_processed_)
-            chunk = final_result();
-        else
-            chunk = {};
-    }
-
-    void addFilePath(const String & patition_id, const String & filename)
+    void addFilePath(const String & partition_id, const String & filename)
     {
         assert(!filename.empty());
 
@@ -134,9 +144,9 @@ public:
 
         assert(filename_ == filename);
 
-        if (patition_id.empty())
+        if (partition_id.empty())
             return;
-        file_to_count_.emplace(copyStringInArena(partition_keys_arena_, 
patition_id), 0);
+        file_to_count_.emplace(copyStringInArena(partition_keys_arena_, 
partition_id), 0);
     }
 
     void collectStats(const String & file_path, size_t rows)
@@ -155,7 +165,7 @@ class SubstraitFileSink final : public DB::SinkToStorage
     const std::string partition_id_;
     const std::string relative_path_;
     OutputFormatFile::OutputFormatPtr output_format_;
-    std::shared_ptr<WriteStats> stats_{nullptr};
+    std::shared_ptr<WriteStats> stats_;
 
     static std::string makeFilename(const std::string & base_path, const 
std::string & partition_id, const std::string & relative)
     {
@@ -174,22 +184,18 @@ public:
         const std::string & partition_id,
         const std::string & relative,
         const std::string & format_hint,
-        const DB::Block & header)
+        const DB::Block & header,
+        const std::shared_ptr<WriteStatsBase> & stats)
         : SinkToStorage(header)
         , partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id)
         , relative_path_(relative)
         , output_format_(createOutputFormatFile(context, 
makeFilename(base_path, partition_id, relative), header, format_hint)
                              ->createOutputFormat(header))
+        , stats_(std::dynamic_pointer_cast<WriteStats>(stats))
     {
-    }
-    String getName() const override { return "SubstraitFileSink"; }
-
-    ///TODO: remove this function
-    void setStats(const std::shared_ptr<WriteStats> & stats)
-    {
-        stats_ = stats;
         stats_->addFilePath(partition_id_, relative_path_);
     }
+    String getName() const override { return "SubstraitFileSink"; }
 
 protected:
     void consume(DB::Chunk & chunk) override
@@ -208,7 +214,7 @@ protected:
     }
 };
 
-class SubstraitPartitionedFileSink final : public DB::PartitionedSink
+class SparkPartitionedBaseSink : public DB::PartitionedSink
 {
     static const std::string DEFAULT_PARTITION_NAME;
 
@@ -237,13 +243,27 @@ public:
         return DB::makeASTFunction("concat", std::move(arguments));
     }
 
-private:
+protected:
+    DB::ContextPtr context_;
+    std::shared_ptr<WriteStatsBase> stats_;
+
+public:
+    SparkPartitionedBaseSink(
+        const DB::ContextPtr & context,
+        const DB::Names & partition_by,
+        const DB::Block & input_header,
+        const std::shared_ptr<WriteStatsBase> & stats)
+        : PartitionedSink(make_partition_expression(partition_by), context, 
input_header), context_(context), stats_(stats)
+    {
+    }
+};
+
+class SubstraitPartitionedFileSink final : public SparkPartitionedBaseSink
+{
     const std::string base_path_;
     const std::string filename_;
-    DB::ContextPtr context_;
     const DB::Block sample_block_;
     const std::string format_hint_;
-    std::shared_ptr<WriteStats> stats_{nullptr};
 
 public:
     SubstraitPartitionedFileSink(
@@ -253,27 +273,23 @@ public:
         const DB::Block & sample_block,
         const std::string & base_path,
         const std::string & filename,
-        const std::string & format_hint)
-        : PartitionedSink(make_partition_expression(partition_by), context, 
input_header)
+        const std::string & format_hint,
+        const std::shared_ptr<WriteStatsBase> & stats)
+        : SparkPartitionedBaseSink(context, partition_by, input_header, stats)
         , base_path_(base_path)
         , filename_(filename)
-        , context_(context)
         , sample_block_(sample_block)
         , format_hint_(format_hint)
     {
     }
+
     DB::SinkPtr createSinkForPartition(const String & partition_id) override
     {
         assert(stats_);
         const auto partition_path = fmt::format("{}/{}", partition_id, 
filename_);
-        PartitionedSink::validatePartitionKey(partition_path, true);
-        auto file_sink = std::make_shared<SubstraitFileSink>(context_, 
base_path_, partition_id, filename_, format_hint_, sample_block_);
-        file_sink->setStats(stats_);
-        return file_sink;
+        validatePartitionKey(partition_path, true);
+        return std::make_shared<SubstraitFileSink>(context_, base_path_, 
partition_id, filename_, format_hint_, sample_block_, stats_);
     }
     String getName() const override { return "SubstraitPartitionedFileSink"; }
-
-    ///TODO: remove this function
-    void setStats(const std::shared_ptr<WriteStats> & stats) { stats_ = stats; 
}
 };
 }
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp 
b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
index ddbf113353..1ad90060f4 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
@@ -255,24 +255,42 @@ TEST(MergeTree, SparkMergeTree)
     }
 }
 
-INCBIN(_3_mergetree_plan_, SOURCE_DIR 
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json");
-INCBIN(_3_mergetree_plan_input_, SOURCE_DIR 
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline_input.json");
-
-TEST(MergeTree, Pipeline)
+INCBIN(_3_mergetree_plan_input_, SOURCE_DIR 
"/utils/extern-local-engine/tests/json/mergetree/lineitem_parquet_input.json");
+namespace
+{
+void writeMerge(std::string_view json_plan,
+    const std::string & outputPath ,
+    const std::function<void(const DB::Block &)> & callback, 
std::optional<std::string> input = std::nullopt)
 {
     const auto context = 
DB::Context::createCopy(QueryContext::globalContext());
-    GlutenWriteSettings settings{.task_write_tmp_dir = 
"tmp/lineitem_mergetree"};
+    GlutenWriteSettings settings{.task_write_tmp_dir = outputPath};
     settings.set(context);
     SparkMergeTreeWritePartitionSettings partition_settings{.part_name_prefix 
= "pipline_prefix"};
     partition_settings.set(context);
 
-    auto [_, local_executor] = test::create_plan_and_executor(
-        EMBEDDED_PLAN(_3_mergetree_plan_), 
replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_)), context);
-    size_t sum = 0;
+    auto input_json = 
input.value_or(replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_)));
+    auto [_, local_executor] = test::create_plan_and_executor(json_plan, 
input_json, context);
+
     while (local_executor->hasNext())
+        callback(*local_executor->nextColumnar());
+}
+}
+INCBIN(_3_mergetree_plan_, SOURCE_DIR 
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json");
+INCBIN(_4_mergetree_plan_, SOURCE_DIR 
"/utils/extern-local-engine/tests/json/mergetree/4_one_pipeline.json");
+TEST(MergeTree, Pipeline)
+{
+    
writeMerge(EMBEDDED_PLAN(_3_mergetree_plan_),"tmp/lineitem_mergetree",[&](const 
DB::Block & block)
     {
-        const Block & x = *local_executor->nextColumnar();
-        EXPECT_EQ(1, x.rows());
-        //-debug::headBlock("pipeline write", x);
-    }
+        EXPECT_EQ(1, block.rows());
+        debug::headBlock(block);
+    });
+}
+
+TEST(MergeTree, PipelineWithPartition)
+{
+    
writeMerge(EMBEDDED_PLAN(_4_mergetree_plan_),"tmp/lineitem_mergetree_p",[&](const
 DB::Block & block)
+    {
+        EXPECT_EQ(2525, block.rows());
+        debug::headBlock(block);
+    });
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json 
b/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json
new file mode 100644
index 0000000000..14a9b3dda2
--- /dev/null
+++ b/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json
@@ -0,0 +1,377 @@
+{
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "write": {
+            "namedTable": {
+              "advancedExtension": {
+                "optimization": {
+                  "@type": "type.googleapis.com/local_engine.Write",
+                  "common": {
+                    "format": "mergetree"
+                  },
+                  "mergetree": {
+                    "database": "default",
+                    "table": "lineitem_mergetree_insertoverwrite2",
+                    "snapshotId": "1731309448915_0",
+                    "orderByKey": "tuple()",
+                    "storagePolicy": "default"
+                  }
+                },
+                "enhancement": {
+                  "@type": "type.googleapis.com/substrait.Type",
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      }
+                    ],
+                    "nullability": "NULLABILITY_REQUIRED"
+                  }
+                }
+              }
+            },
+            "tableSchema": {
+              "names": [
+                "l_orderkey",
+                "l_partkey",
+                "l_suppkey",
+                "l_linenumber",
+                "l_quantity",
+                "l_extendedprice",
+                "l_discount",
+                "l_tax",
+                "l_returnflag",
+                "l_linestatus",
+                "l_shipdate",
+                "l_commitdate",
+                "l_receiptdate",
+                "l_shipinstruct",
+                "l_shipmode",
+                "l_comment"
+              ],
+              "struct": {
+                "types": [
+                  {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "fp64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "fp64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "fp64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "fp64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "string": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "string": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "date": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "date": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "date": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "string": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "string": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  {
+                    "string": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  }
+                ]
+              },
+              "columnTypes": [
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "PARTITION_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL",
+                "NORMAL_COL"
+              ]
+            },
+            "input": {
+              "read": {
+                "common": {
+                  "direct": {}
+                },
+                "baseSchema": {
+                  "names": [
+                    "l_orderkey",
+                    "l_partkey",
+                    "l_suppkey",
+                    "l_linenumber",
+                    "l_quantity",
+                    "l_extendedprice",
+                    "l_discount",
+                    "l_tax",
+                    "l_returnflag",
+                    "l_linestatus",
+                    "l_shipdate",
+                    "l_commitdate",
+                    "l_receiptdate",
+                    "l_shipinstruct",
+                    "l_shipmode",
+                    "l_comment"
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "fp64": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "date": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      {
+                        "string": {
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      }
+                    ]
+                  },
+                  "columnTypes": [
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL",
+                    "NORMAL_COL"
+                  ]
+                },
+                "advancedExtension": {
+                  "optimization": {
+                    "@type": "type.googleapis.com/google.protobuf.StringValue",
+                    "value": "isMergeTree=0\n"
+                  }
+                }
+              }
+            }
+          }
+        },
+        "outputSchema": {
+          "nullability": "NULLABILITY_REQUIRED"
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/3_one_pipeline_input.json 
b/cpp-ch/local-engine/tests/json/mergetree/lineitem_parquet_input.json
similarity index 100%
rename from cpp-ch/local-engine/tests/json/mergetree/3_one_pipeline_input.json
rename to cpp-ch/local-engine/tests/json/mergetree/lineitem_parquet_input.json


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to