This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c1a3f7b502 [GLUTEN-7028][CH][Part-8] Support one pipeline write for
partition mergetree (#7924)
c1a3f7b502 is described below
commit c1a3f7b502ceafd7b5ca11117a93a295ea5d69ec
Author: Chang chen <[email protected]>
AuthorDate: Wed Nov 13 09:32:32 2024 +0800
[GLUTEN-7028][CH][Part-8] Support one pipeline write for partition
mergetree (#7924)
* [Refactor] simple refactor
* [Refactor] Remove setStats
* [Refactor] SparkPartitionedBaseSink and WriteStatsBase
* [Refactor] Add explicit SparkMergeTreeWriteSettings(const DB::ContextPtr
& context);
* [New] Support writing partition mergetree in one pipeline
---
.../GlutenClickHouseMergeTreeWriteSuite.scala | 180 +++++-----
.../Parser/RelParsers/WriteRelParser.cpp | 39 +--
.../local-engine/Parser/SerializedPlanParser.cpp | 8 +-
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 4 +-
.../Storages/MergeTree/SparkMergeTreeSink.cpp | 19 +-
.../Storages/MergeTree/SparkMergeTreeSink.h | 160 +++++----
.../MergeTree/SparkMergeTreeWriteSettings.cpp | 3 +-
.../MergeTree/SparkMergeTreeWriteSettings.h | 2 +-
.../Storages/MergeTree/SparkStorageMergeTree.cpp | 8 +-
.../Storages/Output/NormalFileWriter.cpp | 2 +-
.../Storages/Output/NormalFileWriter.h | 122 ++++---
.../tests/gtest_write_pipeline_mergetree.cpp | 42 ++-
.../tests/json/mergetree/4_one_pipeline.json | 377 +++++++++++++++++++++
...line_input.json => lineitem_parquet_input.json} | 0
14 files changed, 701 insertions(+), 265 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index b33a2065f6..85c8c2d92a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -162,103 +162,107 @@ class GlutenClickHouseMergeTreeWriteSuite
}
test("test mergetree insert overwrite") {
- spark.sql(s"""
- |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
- |""".stripMargin)
+ withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key,
spark35.toString)) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite;
+ |""".stripMargin)
- spark.sql(s"""
- |CREATE TABLE IF NOT EXISTS lineitem_mergetree_insertoverwrite
- |(
- | l_orderkey bigint,
- | l_partkey bigint,
- | l_suppkey bigint,
- | l_linenumber bigint,
- | l_quantity double,
- | l_extendedprice double,
- | l_discount double,
- | l_tax double,
- | l_returnflag string,
- | l_linestatus string,
- | l_shipdate date,
- | l_commitdate date,
- | l_receiptdate date,
- | l_shipinstruct string,
- | l_shipmode string,
- | l_comment string
- |)
- |USING clickhouse
- |LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
- |""".stripMargin)
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_insertoverwrite
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_insertoverwrite'
+ |""".stripMargin)
- spark.sql(s"""
- | insert into table lineitem_mergetree_insertoverwrite
- | select * from lineitem
- |""".stripMargin)
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_insertoverwrite
+ | select * from lineitem
+ |""".stripMargin)
- spark.sql(s"""
- | insert overwrite table lineitem_mergetree_insertoverwrite
- | select * from lineitem where mod(l_orderkey,2) = 1
- |""".stripMargin)
- val sql2 =
- s"""
- | select count(*) from lineitem_mergetree_insertoverwrite
- |
- |""".stripMargin
- assertResult(300001)(
- // total rows should remain unchanged
- spark.sql(sql2).collect().apply(0).get(0)
- )
+ spark.sql(s"""
+ | insert overwrite table lineitem_mergetree_insertoverwrite
+ | select * from lineitem where mod(l_orderkey,2) = 1
+ |""".stripMargin)
+ val sql2 =
+ s"""
+ | select count(*) from lineitem_mergetree_insertoverwrite
+ |
+ |""".stripMargin
+ assertResult(300001)(
+ // total rows should remain unchanged
+ spark.sql(sql2).collect().apply(0).get(0)
+ )
+ }
}
test("test mergetree insert overwrite partitioned table with small table,
static") {
- spark.sql(s"""
- |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
- |""".stripMargin)
+ withSQLConf((CHConf.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, "false")) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_insertoverwrite2;
+ |""".stripMargin)
- spark.sql(s"""
- |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_insertoverwrite2
- |(
- | l_orderkey bigint,
- | l_partkey bigint,
- | l_suppkey bigint,
- | l_linenumber bigint,
- | l_quantity double,
- | l_extendedprice double,
- | l_discount double,
- | l_tax double,
- | l_returnflag string,
- | l_linestatus string,
- | l_shipdate date,
- | l_commitdate date,
- | l_receiptdate date,
- | l_shipinstruct string,
- | l_shipmode string,
- | l_comment string
- |)
- |USING clickhouse
- |PARTITIONED BY (l_shipdate)
- |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
- |""".stripMargin)
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_insertoverwrite2
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_shipdate)
+ |LOCATION '$basePath/lineitem_mergetree_insertoverwrite2'
+ |""".stripMargin)
- spark.sql(s"""
- | insert into table lineitem_mergetree_insertoverwrite2
- | select * from lineitem
- |""".stripMargin)
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_insertoverwrite2
+ | select * from lineitem
+ |""".stripMargin)
- spark.sql(
- s"""
- | insert overwrite table lineitem_mergetree_insertoverwrite2
- | select * from lineitem where l_shipdate BETWEEN date'1993-02-01'
AND date'1993-02-10'
- |""".stripMargin)
- val sql2 =
- s"""
- | select count(*) from lineitem_mergetree_insertoverwrite2
- |
- |""".stripMargin
- assertResult(2418)(
- // total rows should remain unchanged
- spark.sql(sql2).collect().apply(0).get(0)
- )
+ spark.sql(
+ s"""
+ | insert overwrite table lineitem_mergetree_insertoverwrite2
+ | select * from lineitem where l_shipdate BETWEEN date'1993-02-01'
AND date'1993-02-10'
+ |""".stripMargin)
+ val sql2 =
+ s"""
+ | select count(*) from lineitem_mergetree_insertoverwrite2
+ |
+ |""".stripMargin
+ assertResult(2418)(
+ // total rows should remain unchanged
+ spark.sql(sql2).collect().apply(0).get(0)
+ )
+ }
}
test("test mergetree insert overwrite partitioned table with small table,
dynamic") {
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index ea1239a809..25f786a774 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -58,15 +58,11 @@ DB::ProcessorPtr make_sink(
{
if (partition_by.empty())
{
- auto file_sink = std::make_shared<SubstraitFileSink>(context,
base_path, "", filename, format_hint, input_header);
- file_sink->setStats(stats);
- return file_sink;
+ return std::make_shared<SubstraitFileSink>(context, base_path, "",
filename, format_hint, input_header, stats);
}
- auto file_sink = std::make_shared<SubstraitPartitionedFileSink>(
- context, partition_by, input_header, output_header, base_path,
filename, format_hint);
- file_sink->setStats(stats);
- return file_sink;
+ return std::make_shared<SubstraitPartitionedFileSink>(
+ context, partition_by, input_header, output_header, base_path,
filename, format_hint, stats);
}
DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const
DB::Block & output)
@@ -148,25 +144,30 @@ void addMergeTreeSinkTransform(
const DB::ContextPtr & context,
const DB::QueryPipelineBuilderPtr & builder,
const MergeTreeTable & merge_tree_table,
- const DB::Block & output,
- const DB::Names & /*partitionCols*/)
+ const DB::Block & header,
+ const DB::Names & partition_by)
{
- const DB::Settings & settings = context->getSettingsRef();
- const auto dest_storage =
merge_tree_table.getStorage(context->getGlobalContext());
- StorageMetadataPtr metadata_snapshot =
dest_storage->getInMemoryMetadataPtr();
- ASTPtr none;
- auto sink = dest_storage->write(none, metadata_snapshot, context, false);
+
Chain chain;
- chain.addSink(sink);
- const SinkHelper & sink_helper = assert_cast<const SparkMergeTreeSink
&>(*sink).sinkHelper();
//
- auto stats = std::make_shared<MergeTreeStats>(output, sink_helper);
+ auto stats = std::make_shared<MergeTreeStats>(header);
chain.addSink(stats);
//
+
+ SparkMergeTreeWriteSettings write_settings{context};
+ if (partition_by.empty())
+ write_settings.partition_settings.partition_dir =
SubstraitFileSink::NO_PARTITION_ID;
+
+ auto sink = partition_by.empty() ?
+ SparkMergeTreeSink::create(merge_tree_table, write_settings,
context->getGlobalContext(), {stats}) :
+ std::make_shared<SparkMergeTreePartitionedFileSink>(header,
partition_by, merge_tree_table, write_settings, context, stats);
+
+ chain.addSource(sink);
+ const DB::Settings & settings = context->getSettingsRef();
chain.addSource(std::make_shared<ApplySquashingTransform>(
- output, settings[Setting::min_insert_block_size_rows],
settings[Setting::min_insert_block_size_bytes]));
+ header, settings[Setting::min_insert_block_size_rows],
settings[Setting::min_insert_block_size_bytes]));
chain.addSource(std::make_shared<PlanSquashingTransform>(
- output, settings[Setting::min_insert_block_size_rows],
settings[Setting::min_insert_block_size_bytes]));
+ header, settings[Setting::min_insert_block_size_rows],
settings[Setting::min_insert_block_size_bytes]));
builder->addChain(std::move(chain));
}
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 4e49c577bf..748ff88acb 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -283,7 +283,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const
substrait::Rel & rel, std::list
return query_plan;
}
-DB::QueryPipelineBuilderPtr
SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan)
+DB::QueryPipelineBuilderPtr
SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan) const
{
const Settings & settings =
parser_context->queryContext()->getSettingsRef();
QueryPriorities priorities;
@@ -355,11 +355,7 @@ NonNullableColumnsResolver::NonNullableColumnsResolver(
expression_parser = std::make_unique<ExpressionParser>(parser_context);
}
-NonNullableColumnsResolver::~NonNullableColumnsResolver()
-{
-}
-
-// make it simple at present, if the condition contains or, return empty for
both side.
+// make it simple at present if the condition contains or, return empty for
both side.
std::set<String> NonNullableColumnsResolver::resolve()
{
collected_columns.clear();
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 2bcc09a8ed..f0ec608a33 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -48,7 +48,7 @@ class NonNullableColumnsResolver
public:
explicit NonNullableColumnsResolver(
const DB::Block & header_, std::shared_ptr<const ParserContext>
parser_context_, const substrait::Expression & cond_rel_);
- ~NonNullableColumnsResolver();
+ ~NonNullableColumnsResolver() = default;
// return column names
std::set<String> resolve();
@@ -76,7 +76,7 @@ public:
/// visible for UT
DB::QueryPlanPtr parse(const substrait::Plan & plan);
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan &
plan);
- DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan);
+ DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan)
const;
///
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
index 43a3a78295..de0c244f3e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
@@ -62,15 +62,24 @@ void SparkMergeTreeSink::onStart()
void SparkMergeTreeSink::onFinish()
{
sink_helper->finish(context);
+ if (stats_.has_value())
+ (*stats_)->collectStats(sink_helper->unsafeGet(),
sink_helper->write_settings.partition_settings.partition_dir);
}
/////
-SinkHelperPtr SparkMergeTreeSink::create(
- const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings
& write_settings_, const DB::ContextMutablePtr & context)
+SinkToStoragePtr SparkMergeTreeSink::create(
+ const MergeTreeTable & merge_tree_table,
+ const SparkMergeTreeWriteSettings & write_settings_,
+ const DB::ContextMutablePtr & context,
+ const SinkStatsOption & stats)
{
+ if (write_settings_.partition_settings.part_name_prefix.empty())
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty
part_name_prefix is not allowed.");
+
auto dest_storage = merge_tree_table.getStorage(context);
bool isRemoteStorage =
dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
bool insert_with_local_storage =
!write_settings_.insert_without_local_storage;
+ SinkHelperPtr sink_helper;
if (insert_with_local_storage && isRemoteStorage)
{
auto temp = merge_tree_table.copyToDefaultPolicyStorage(context);
@@ -78,10 +87,10 @@ SinkHelperPtr SparkMergeTreeSink::create(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Create temp table {} for local merge.",
temp->getStorageID().getFullNameNotQuoted());
- return std::make_shared<CopyToRemoteSinkHelper>(temp, dest_storage,
write_settings_);
+ sink_helper = std::make_shared<CopyToRemoteSinkHelper>(temp,
dest_storage, write_settings_);
}
-
- return std::make_shared<DirectSinkHelper>(dest_storage, write_settings_,
isRemoteStorage);
+ sink_helper = std::make_shared<DirectSinkHelper>(dest_storage,
write_settings_, isRemoteStorage);
+ return std::make_shared<SparkMergeTreeSink>(sink_helper, context, stats);
}
SinkHelper::SinkHelper(const SparkStorageMergeTreePtr & data_, const
SparkMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_)
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
index d19e5dc4e9..065b147462 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
@@ -21,6 +21,7 @@
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/MergeTree/SparkMergeTreeWriteSettings.h>
#include <Storages/MergeTree/SparkStorageMergeTree.h>
+#include <Storages/Output/NormalFileWriter.h>
#include <Common/BlockTypeUtils.h>
namespace local_engine
@@ -152,39 +153,18 @@ public:
}
};
-class SparkMergeTreeSink : public DB::SinkToStorage
+class MergeTreeStats : public WriteStatsBase
{
-public:
- static SinkHelperPtr create(
- const MergeTreeTable & merge_tree_table,
- const SparkMergeTreeWriteSettings & write_settings_,
- const DB::ContextMutablePtr & context);
+ DB::MutableColumns columns_;
- explicit SparkMergeTreeSink(const SinkHelperPtr & sink_helper_, const
ContextPtr & context_)
- : SinkToStorage(sink_helper_->metadata_snapshot->getSampleBlock()),
context(context_), sink_helper(sink_helper_)
+ enum ColumnIndex
{
- }
- ~SparkMergeTreeSink() override = default;
-
- String getName() const override { return "SparkMergeTreeSink"; }
- void consume(Chunk & chunk) override;
- void onStart() override;
- void onFinish() override;
-
- const SinkHelper & sinkHelper() const { return *sink_helper; }
-
-private:
- ContextPtr context;
- SinkHelperPtr sink_helper;
-
- int part_num = 1;
-};
-
-
-class MergeTreeStats : public DB::ISimpleTransform
-{
- bool all_chunks_processed_ = false; /// flag to determine if we have
already processed all chunks
- const SinkHelper & sink_helper;
+ part_name,
+ partition_id,
+ record_count,
+ marks_count,
+ size_in_bytes
+ };
static DB::Block statsHeader()
{
@@ -196,69 +176,109 @@ class MergeTreeStats : public DB::ISimpleTransform
{BIGINT(), "size_in_bytes"}});
}
- DB::Chunk final_result() const
+ DB::Chunk final_result() override
+ {
+ size_t rows = columns_[part_name]->size();
+ return DB::Chunk(std::move(columns_), rows);
+ }
+
+public:
+ explicit MergeTreeStats(const DB::Block & input_header_)
+ : WriteStatsBase(input_header_, statsHeader()),
columns_(statsHeader().cloneEmptyColumns())
{
- // TODO: remove it
- const std::string NO_PARTITION_ID{"__NO_PARTITION_ID__"};
+ }
- auto parts = sink_helper.unsafeGet();
+ String getName() const override { return "MergeTreeStats"; }
- const size_t size = parts.size();
- auto file_col = STRING()->createColumn();
- file_col->reserve(size);
+ void collectStats(const std::deque<DB::MergeTreeDataPartPtr> & parts,
const std::string & partition) const
+ {
+ const size_t size = parts.size() + columns_[part_name]->size();
- auto partition_col = STRING()->createColumn();
- partition_col->reserve(size);
+ columns_[part_name]->reserve(size);
+ columns_[partition_id]->reserve(size);
- auto countCol = BIGINT()->createColumn();
- countCol->reserve(size);
- auto & countColData = static_cast<DB::ColumnVector<Int64>
&>(*countCol).getData();
+ columns_[record_count]->reserve(size);
+ auto & countColData = static_cast<DB::ColumnVector<Int64>
&>(*columns_[record_count]).getData();
- auto marksCol = BIGINT()->createColumn();
- marksCol->reserve(size);
- auto & marksColData = static_cast<DB::ColumnVector<Int64>
&>(*marksCol).getData();
+ columns_[marks_count]->reserve(size);
+ auto & marksColData = static_cast<DB::ColumnVector<Int64>
&>(*columns_[marks_count]).getData();
- auto bytesCol = BIGINT()->createColumn();
- bytesCol->reserve(size);
- auto & bytesColData = static_cast<DB::ColumnVector<Int64>
&>(*bytesCol).getData();
+ columns_[size_in_bytes]->reserve(size);
+ auto & bytesColData = static_cast<DB::ColumnVector<Int64>
&>(*columns_[size_in_bytes]).getData();
for (const auto & part : parts)
{
- file_col->insertData(part->name.c_str(), part->name.size());
- partition_col->insertData(NO_PARTITION_ID.c_str(),
NO_PARTITION_ID.size());
+ columns_[part_name]->insertData(part->name.c_str(),
part->name.size());
+ columns_[partition_id]->insertData(partition.c_str(),
partition.size());
+
countColData.emplace_back(part->rows_count);
marksColData.emplace_back(part->getMarksCount());
bytesColData.emplace_back(part->getBytesOnDisk());
}
- const DB::Columns res_columns{
- std::move(file_col), std::move(partition_col),
std::move(countCol), std::move(marksCol), std::move(bytesCol)};
- return DB::Chunk(res_columns, size);
}
+};
+class SparkMergeTreeSink : public DB::SinkToStorage
+{
public:
- explicit MergeTreeStats(const DB::Block & input_header_, const SinkHelper
& sink_helper_)
- : ISimpleTransform(input_header_, statsHeader(), true),
sink_helper(sink_helper_)
+ using SinkStatsOption = std::optional<std::shared_ptr<MergeTreeStats>>;
+ static SinkToStoragePtr create(
+ const MergeTreeTable & merge_tree_table,
+ const SparkMergeTreeWriteSettings & write_settings_,
+ const DB::ContextMutablePtr & context,
+ const SinkStatsOption & stats = {});
+
+ explicit SparkMergeTreeSink(const SinkHelperPtr & sink_helper_, const
ContextPtr & context_, const SinkStatsOption & stats)
+ : SinkToStorage(sink_helper_->metadata_snapshot->getSampleBlock()),
context(context_), sink_helper(sink_helper_), stats_(stats)
{
}
- Status prepare() override
- {
- if (input.isFinished() && !output.isFinished() && !has_input &&
!all_chunks_processed_)
- {
- all_chunks_processed_ = true;
- /// return Ready to call transform() for generating filling rows
after latest chunk was processed
- return Status::Ready;
- }
+ ~SparkMergeTreeSink() override = default;
+
+ String getName() const override { return "SparkMergeTreeSink"; }
+ void consume(Chunk & chunk) override;
+ void onStart() override;
+ void onFinish() override;
+
+ const SinkHelper & sinkHelper() const { return *sink_helper; }
- return ISimpleTransform::prepare();
+private:
+ ContextPtr context;
+ SinkHelperPtr sink_helper;
+ std::optional<std::shared_ptr<MergeTreeStats>> stats_;
+ int part_num = 1;
+};
+
+
+class SparkMergeTreePartitionedFileSink final : public SparkPartitionedBaseSink
+{
+ const SparkMergeTreeWriteSettings write_settings_;
+ MergeTreeTable table;
+
+public:
+ SparkMergeTreePartitionedFileSink(
+ const DB::Block & input_header,
+ const DB::Names & partition_by,
+ const MergeTreeTable & merge_tree_table,
+ const SparkMergeTreeWriteSettings & write_settings,
+ const DB::ContextPtr & context,
+ const std::shared_ptr<WriteStatsBase> & stats)
+ : SparkPartitionedBaseSink(context, partition_by, input_header,
stats), write_settings_(write_settings), table(merge_tree_table)
+ {
}
- String getName() const override { return "MergeTreeStats"; }
- void transform(DB::Chunk & chunk) override
+ SinkPtr createSinkForPartition(const String & partition_id) override
{
- if (all_chunks_processed_)
- chunk = final_result();
- else
- chunk = {};
+ SparkMergeTreeWriteSettings write_settings{write_settings_};
+
+ assert(write_settings.partition_settings.partition_dir.empty());
+ assert(write_settings.partition_settings.bucket_dir.empty());
+ write_settings.partition_settings.part_name_prefix
+ = fmt::format("{}/{}", partition_id,
write_settings.partition_settings.part_name_prefix);
+ write_settings.partition_settings.partition_dir = partition_id;
+
+ return SparkMergeTreeSink::create(
+ table, write_settings, context_->getGlobalContext(),
{std::dynamic_pointer_cast<MergeTreeStats>(stats_)});
}
};
+
}
diff --git
a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
index 75472d410c..e584b003d2 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.cpp
@@ -26,7 +26,8 @@ namespace local_engine
IMPLEMENT_GLUTEN_SETTINGS(SparkMergeTreeWritePartitionSettings,
MERGE_TREE_WRITE_RELATED_SETTINGS)
-void SparkMergeTreeWriteSettings::load(const DB::ContextPtr & context)
+SparkMergeTreeWriteSettings::SparkMergeTreeWriteSettings(const DB::ContextPtr
& context)
+ : partition_settings(SparkMergeTreeWritePartitionSettings::get(context))
{
const DB::Settings & settings = context->getSettingsRef();
merge_after_insert =
settings.get(MERGETREE_MERGE_AFTER_INSERT).safeGet<bool>();
diff --git
a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
index 1fbbdbe346..e89b2aaf5e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriteSettings.h
@@ -35,6 +35,6 @@ struct SparkMergeTreeWriteSettings
size_t merge_min_size = 1024 * 1024 * 1024;
size_t merge_limit_parts = 10;
- void load(const DB::ContextPtr & context);
+ explicit SparkMergeTreeWriteSettings(const DB::ContextPtr & context);
};
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index a6a252be3d..0be7e0d892 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -513,17 +513,11 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeDataWriter::writeTempPart(
SinkToStoragePtr SparkWriteStorageMergeTree::write(
const ASTPtr &, const StorageMetadataPtr & /*storage_in_memory_metadata*/,
ContextPtr context, bool /*async_insert*/)
{
- SparkMergeTreeWriteSettings
settings{.partition_settings{SparkMergeTreeWritePartitionSettings::get(context)}};
- if (settings.partition_settings.part_name_prefix.empty())
- throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty
part_name_prefix is not allowed.");
- settings.load(context);
- SinkHelperPtr sink_helper = SparkMergeTreeSink::create(table, settings,
getContext());
#ifndef NDEBUG
auto dest_storage = table.getStorage(getContext());
assert(dest_storage.get() == this);
#endif
-
- return std::make_shared<SparkMergeTreeSink>(sink_helper, context);
+ return SparkMergeTreeSink::create(table,
SparkMergeTreeWriteSettings{context}, getContext());
}
}
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index 8c904ab205..d572b85385 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -23,7 +23,7 @@ namespace local_engine
{
const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
-const std::string
SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
+const std::string
SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const
DB::ContextPtr & context_) : file(file_), context(context_)
{
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 8a25d7b21f..cd9c848dd4 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -61,20 +61,51 @@ private:
OutputFormatFilePtr createOutputFormatFile(
const DB::ContextPtr & context, const std::string & file_uri, const
DB::Block & preferred_schema, const std::string & format_hint);
-class WriteStats : public DB::ISimpleTransform
+class WriteStatsBase : public DB::ISimpleTransform
{
+protected:
bool all_chunks_processed_ = false; /// flag to determine if we have
already processed all chunks
- DB::Arena partition_keys_arena_;
- std::string filename_;
+ virtual DB::Chunk final_result() = 0;
- absl::flat_hash_map<StringRef, size_t> file_to_count_;
+public:
+ WriteStatsBase(const DB::Block & input_header_, const DB::Block &
output_header_)
+ : ISimpleTransform(input_header_, output_header_, true)
+ {
+ }
+
+ Status prepare() override
+ {
+ if (input.isFinished() && !output.isFinished() && !has_input &&
!all_chunks_processed_)
+ {
+ all_chunks_processed_ = true;
+ /// return Ready to call transform() for generating filling rows
after latest chunk was processed
+ return Status::Ready;
+ }
+ return ISimpleTransform::prepare();
+ }
+
+ void transform(DB::Chunk & chunk) override
+ {
+ if (all_chunks_processed_)
+ chunk = final_result();
+ else
+ chunk = {};
+ }
+};
+
+class WriteStats : public WriteStatsBase
+{
static DB::Block statsHeader()
{
return makeBlockHeader({{STRING(), "filename"}, {STRING(),
"partition_id"}, {BIGINT(), "record_count"}});
}
+ DB::Arena partition_keys_arena_;
+ std::string filename_;
+ absl::flat_hash_map<StringRef, size_t> file_to_count_;
- DB::Chunk final_result() const
+protected:
+ DB::Chunk final_result() override
{
const size_t size = file_to_count_.size();
@@ -102,30 +133,9 @@ class WriteStats : public DB::ISimpleTransform
}
public:
- explicit WriteStats(const DB::Block & input_header_) :
ISimpleTransform(input_header_, statsHeader(), true) { }
-
- Status prepare() override
- {
- if (input.isFinished() && !output.isFinished() && !has_input &&
!all_chunks_processed_)
- {
- all_chunks_processed_ = true;
- /// return Ready to call transform() for generating filling rows
after latest chunk was processed
- return Status::Ready;
- }
-
- return ISimpleTransform::prepare();
- }
-
+ explicit WriteStats(const DB::Block & input_header_) :
WriteStatsBase(input_header_, statsHeader()) { }
String getName() const override { return "WriteStats"; }
- void transform(DB::Chunk & chunk) override
- {
- if (all_chunks_processed_)
- chunk = final_result();
- else
- chunk = {};
- }
-
- void addFilePath(const String & patition_id, const String & filename)
+ void addFilePath(const String & partition_id, const String & filename)
{
assert(!filename.empty());
@@ -134,9 +144,9 @@ public:
assert(filename_ == filename);
- if (patition_id.empty())
+ if (partition_id.empty())
return;
- file_to_count_.emplace(copyStringInArena(partition_keys_arena_,
patition_id), 0);
+ file_to_count_.emplace(copyStringInArena(partition_keys_arena_,
partition_id), 0);
}
void collectStats(const String & file_path, size_t rows)
@@ -155,7 +165,7 @@ class SubstraitFileSink final : public DB::SinkToStorage
const std::string partition_id_;
const std::string relative_path_;
OutputFormatFile::OutputFormatPtr output_format_;
- std::shared_ptr<WriteStats> stats_{nullptr};
+ std::shared_ptr<WriteStats> stats_;
static std::string makeFilename(const std::string & base_path, const
std::string & partition_id, const std::string & relative)
{
@@ -174,22 +184,18 @@ public:
const std::string & partition_id,
const std::string & relative,
const std::string & format_hint,
- const DB::Block & header)
+ const DB::Block & header,
+ const std::shared_ptr<WriteStatsBase> & stats)
: SinkToStorage(header)
, partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id)
, relative_path_(relative)
, output_format_(createOutputFormatFile(context,
makeFilename(base_path, partition_id, relative), header, format_hint)
->createOutputFormat(header))
+ , stats_(std::dynamic_pointer_cast<WriteStats>(stats))
{
- }
- String getName() const override { return "SubstraitFileSink"; }
-
- ///TODO: remove this function
- void setStats(const std::shared_ptr<WriteStats> & stats)
- {
- stats_ = stats;
stats_->addFilePath(partition_id_, relative_path_);
}
+ String getName() const override { return "SubstraitFileSink"; }
protected:
void consume(DB::Chunk & chunk) override
@@ -208,7 +214,7 @@ protected:
}
};
-class SubstraitPartitionedFileSink final : public DB::PartitionedSink
+class SparkPartitionedBaseSink : public DB::PartitionedSink
{
static const std::string DEFAULT_PARTITION_NAME;
@@ -237,13 +243,27 @@ public:
return DB::makeASTFunction("concat", std::move(arguments));
}
-private:
+protected:
+ DB::ContextPtr context_;
+ std::shared_ptr<WriteStatsBase> stats_;
+
+public:
+ SparkPartitionedBaseSink(
+ const DB::ContextPtr & context,
+ const DB::Names & partition_by,
+ const DB::Block & input_header,
+ const std::shared_ptr<WriteStatsBase> & stats)
+ : PartitionedSink(make_partition_expression(partition_by), context,
input_header), context_(context), stats_(stats)
+ {
+ }
+};
+
+class SubstraitPartitionedFileSink final : public SparkPartitionedBaseSink
+{
const std::string base_path_;
const std::string filename_;
- DB::ContextPtr context_;
const DB::Block sample_block_;
const std::string format_hint_;
- std::shared_ptr<WriteStats> stats_{nullptr};
public:
SubstraitPartitionedFileSink(
@@ -253,27 +273,23 @@ public:
const DB::Block & sample_block,
const std::string & base_path,
const std::string & filename,
- const std::string & format_hint)
- : PartitionedSink(make_partition_expression(partition_by), context,
input_header)
+ const std::string & format_hint,
+ const std::shared_ptr<WriteStatsBase> & stats)
+ : SparkPartitionedBaseSink(context, partition_by, input_header, stats)
, base_path_(base_path)
, filename_(filename)
- , context_(context)
, sample_block_(sample_block)
, format_hint_(format_hint)
{
}
+
DB::SinkPtr createSinkForPartition(const String & partition_id) override
{
assert(stats_);
const auto partition_path = fmt::format("{}/{}", partition_id,
filename_);
- PartitionedSink::validatePartitionKey(partition_path, true);
- auto file_sink = std::make_shared<SubstraitFileSink>(context_,
base_path_, partition_id, filename_, format_hint_, sample_block_);
- file_sink->setStats(stats_);
- return file_sink;
+ validatePartitionKey(partition_path, true);
+ return std::make_shared<SubstraitFileSink>(context_, base_path_,
partition_id, filename_, format_hint_, sample_block_, stats_);
}
String getName() const override { return "SubstraitPartitionedFileSink"; }
-
- ///TODO: remove this function
- void setStats(const std::shared_ptr<WriteStats> & stats) { stats_ = stats;
}
};
}
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
index ddbf113353..1ad90060f4 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
@@ -255,24 +255,42 @@ TEST(MergeTree, SparkMergeTree)
}
}
-INCBIN(_3_mergetree_plan_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json");
-INCBIN(_3_mergetree_plan_input_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline_input.json");
-
-TEST(MergeTree, Pipeline)
+INCBIN(_3_mergetree_plan_input_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/mergetree/lineitem_parquet_input.json");
+namespace
+{
+void writeMerge(std::string_view json_plan,
+ const std::string & outputPath ,
+ const std::function<void(const DB::Block &)> & callback,
std::optional<std::string> input = std::nullopt)
{
const auto context =
DB::Context::createCopy(QueryContext::globalContext());
- GlutenWriteSettings settings{.task_write_tmp_dir =
"tmp/lineitem_mergetree"};
+ GlutenWriteSettings settings{.task_write_tmp_dir = outputPath};
settings.set(context);
SparkMergeTreeWritePartitionSettings partition_settings{.part_name_prefix
= "pipline_prefix"};
partition_settings.set(context);
- auto [_, local_executor] = test::create_plan_and_executor(
- EMBEDDED_PLAN(_3_mergetree_plan_),
replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_)), context);
- size_t sum = 0;
+ auto input_json =
input.value_or(replaceLocalFilesWithTPCH(EMBEDDED_PLAN(_3_mergetree_plan_input_)));
+ auto [_, local_executor] = test::create_plan_and_executor(json_plan,
input_json, context);
+
while (local_executor->hasNext())
+ callback(*local_executor->nextColumnar());
+}
+}
+INCBIN(_3_mergetree_plan_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/mergetree/3_one_pipeline.json");
+INCBIN(_4_mergetree_plan_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/mergetree/4_one_pipeline.json");
+TEST(MergeTree, Pipeline)
+{
+
writeMerge(EMBEDDED_PLAN(_3_mergetree_plan_),"tmp/lineitem_mergetree",[&](const
DB::Block & block)
{
- const Block & x = *local_executor->nextColumnar();
- EXPECT_EQ(1, x.rows());
- //-debug::headBlock("pipeline write", x);
- }
+ EXPECT_EQ(1, block.rows());
+ debug::headBlock(block);
+ });
+}
+
+TEST(MergeTree, PipelineWithPartition)
+{
+
writeMerge(EMBEDDED_PLAN(_4_mergetree_plan_),"tmp/lineitem_mergetree_p",[&](const
DB::Block & block)
+ {
+ EXPECT_EQ(2525, block.rows());
+ debug::headBlock(block);
+ });
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json
b/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json
new file mode 100644
index 0000000000..14a9b3dda2
--- /dev/null
+++ b/cpp-ch/local-engine/tests/json/mergetree/4_one_pipeline.json
@@ -0,0 +1,377 @@
+{
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "write": {
+ "namedTable": {
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/local_engine.Write",
+ "common": {
+ "format": "mergetree"
+ },
+ "mergetree": {
+ "database": "default",
+ "table": "lineitem_mergetree_insertoverwrite2",
+ "snapshotId": "1731309448915_0",
+ "orderByKey": "tuple()",
+ "storagePolicy": "default"
+ }
+ },
+ "enhancement": {
+ "@type": "type.googleapis.com/substrait.Type",
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ },
+ "tableSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "PARTITION_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/google.protobuf.StringValue",
+ "value": "isMergeTree=0\n"
+ }
+ }
+ }
+ }
+ }
+ },
+ "outputSchema": {
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/3_one_pipeline_input.json
b/cpp-ch/local-engine/tests/json/mergetree/lineitem_parquet_input.json
similarity index 100%
rename from cpp-ch/local-engine/tests/json/mergetree/3_one_pipeline_input.json
rename to cpp-ch/local-engine/tests/json/mergetree/lineitem_parquet_input.json
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]