This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d8ee3d54a [CH]Daily Update Clickhouse Version (20241118) (#7968)
8d8ee3d54a is described below

commit 8d8ee3d54af359c747cea6310553f2e66d5f2122
Author: LiuNeng <[email protected]>
AuthorDate: Tue Nov 26 22:33:32 2024 +0800

    [CH]Daily Update Clickhouse Version (20241118) (#7968)
    
    What changes were proposed in this pull request?
    Manual rebase Clickhouse repo
    
    How was this patch tested?
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 cpp-ch/clickhouse.version                          |  4 +--
 .../CompactObjectStorageDiskTransaction.cpp        | 39 +++++++++++++++++++---
 .../CompactObjectStorageDiskTransaction.h          | 34 ++++++++++++++++---
 .../Disks/ObjectStorages/GlutenDiskHDFS.cpp        |  2 +-
 .../Disks/ObjectStorages/GlutenDiskS3.cpp          |  2 +-
 .../Functions/SparkFunctionRoundHalfUp.h           | 29 ++++++++++++++--
 .../Operator/GraceAggregatingTransform.cpp         | 25 ++++++++------
 .../Operator/GraceAggregatingTransform.h           |  6 ++--
 .../Parser/RelParsers/CrossRelParser.cpp           |  9 ++++-
 .../Parser/RelParsers/JoinRelParser.cpp            | 13 ++++++--
 .../Storages/MergeTree/SparkStorageMergeTree.cpp   |  9 ++++-
 11 files changed, 141 insertions(+), 31 deletions(-)

diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index beda11ec0a..62a70f06c1 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241111
-CH_COMMIT=3f7e46d4e9e
+CH_BRANCH=rebase_ch/20241118
+CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 5b1fe63a09..148a43580b 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -28,6 +28,36 @@ bool isMetaDataFile(const std::string & path)
     return !path.ends_with("bin");
 }
 
+TemporaryWriteBufferWrapper::TemporaryWriteBufferWrapper(
+    const String & file_name_, const std::shared_ptr<DB::TemporaryDataBuffer> 
& data_buffer_)
+    : WriteBufferFromFileBase(data_buffer_->buffer().size(), 
data_buffer_->buffer().begin(), 0)
+    , file_name(file_name_)
+    , data_buffer(data_buffer_)
+{
+}
+void TemporaryWriteBufferWrapper::preFinalize()
+{
+    next();
+}
+
+void TemporaryWriteBufferWrapper::finalizeImpl()
+{
+    next();
+    data_buffer->finalizeImpl();
+}
+
+void TemporaryWriteBufferWrapper::cancelImpl() noexcept
+{
+    data_buffer->cancelImpl();
+}
+
+void TemporaryWriteBufferWrapper::nextImpl()
+{
+    data_buffer->position() = position();
+    data_buffer->next();
+    BufferBase::set(data_buffer->buffer().begin(), 
data_buffer->buffer().size(), data_buffer->offset());
+}
+
 void CompactObjectStorageDiskTransaction::commit()
 {
     auto metadata_tx = disk.getMetadataStorage()->createTransaction();
@@ -52,9 +82,9 @@ void CompactObjectStorageDiskTransaction::commit()
             [&](auto & item)
             {
                 DB::DiskObjectStorageMetadata 
metadata(object_storage->getCommonKeyPrefix(), item.first);
-                DB::ReadBufferFromFilePRead 
read(item.second->getAbsolutePath());
+                auto read = item.second->read();
                 int file_size = 0;
-                while (int count = read.readBig(buffer.data(), buffer.size()))
+                while (int count = read->readBig(buffer.data(), buffer.size()))
                 {
                     file_size += count;
                     out.write(buffer.data(), count);
@@ -98,12 +128,13 @@ std::unique_ptr<DB::WriteBufferFromFileBase> 
CompactObjectStorageDiskTransaction
             "Don't support write file in different dirs, path {}, prefix path: 
{}",
             path,
             prefix_path);
-    auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
+    auto tmp = std::make_shared<DB::TemporaryDataBuffer>(tmp_data.get());
     files.emplace_back(path, tmp);
     auto tx = disk.getMetadataStorage()->createTransaction();
     tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
     tx->createEmptyMetadataFile(path);
     tx->commit();
-    return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(), 
buf_size);
+
+    return std::make_unique<TemporaryWriteBufferWrapper>(path, tmp);
 }
 }
\ No newline at end of file
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
index becb5371aa..0f95ae01ec 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -32,15 +32,41 @@ extern const int NOT_IMPLEMENTED;
 namespace local_engine
 {
 
+class TemporaryWriteBufferWrapper : public DB::WriteBufferFromFileBase
+{
+public:
+    TemporaryWriteBufferWrapper(const String & file_name_, const 
std::shared_ptr<DB::TemporaryDataBuffer> & data_buffer_);
+
+    void sync() override { data_buffer->nextImpl(); }
+
+    void preFinalize() override;
+
+protected:
+    void finalizeImpl() override;
+    void cancelImpl() noexcept override;
+
+private:
+    void nextImpl() override;
+
+public:
+    std::string getFileName() const override
+    {
+        return file_name;
+    }
+
+private:
+    String file_name;
+    std::shared_ptr<DB::TemporaryDataBuffer> data_buffer;
+};
+
 class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
     public:
     static inline const String PART_DATA_FILE_NAME = "part_data.gluten";
     static inline const String PART_META_FILE_NAME = "part_meta.gluten";
 
-    explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const 
DB::DiskPtr tmp_)
+    explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const 
DB::TemporaryDataOnDiskScopePtr tmp_)
         : disk(disk_), tmp_data(tmp_)
     {
-        chassert(!tmp_->isRemote());
     }
 
     void commit() override;
@@ -170,8 +196,8 @@ class CompactObjectStorageDiskTransaction: public 
DB::IDiskTransaction {
 
 private:
     DB::IDisk & disk;
-    DB::DiskPtr tmp_data;
-    std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>> 
files;
+    DB::TemporaryDataOnDiskScopePtr tmp_data;
+    std::vector<std::pair<String, std::shared_ptr<DB::TemporaryDataBuffer>>> 
files;
     String prefix_path = "";
 };
 }
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index bd005132b9..fed23d7eef 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -29,7 +29,7 @@ using namespace DB;
 
 DiskTransactionPtr GlutenDiskHDFS::createTransaction()
 {
-    return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
+    return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
QueryContext::globalContext()->getTempDataOnDisk());
 }
 
 void GlutenDiskHDFS::createDirectory(const String & path)
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
index b2a6bb523d..a180ebd7ea 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
@@ -31,7 +31,7 @@ namespace local_engine
 
     DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
     {
-        return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
+        return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
QueryContext::globalContext()->getSharedTempDataOnDisk());
     }
 
     std::unique_ptr<DB::ReadBufferFromFileBase> GlutenDiskS3::readFile(
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionRoundHalfUp.h 
b/cpp-ch/local-engine/Functions/SparkFunctionRoundHalfUp.h
index 3e219b51a6..2dfd25772e 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionRoundHalfUp.h
+++ b/cpp-ch/local-engine/Functions/SparkFunctionRoundHalfUp.h
@@ -127,6 +127,31 @@ public:
     }
 };
 
+template <>
+class BaseFloatRoundingHalfUpComputation<BFloat16, Vectorize::No>
+{
+public:
+    using ScalarType = BFloat16;
+    using VectorType = BFloat16;
+    static const size_t data_count = 1;
+
+    static VectorType load(const ScalarType * in) { return *in; }
+    static VectorType load1(const ScalarType in) { return in; }
+    static VectorType store(ScalarType * out, ScalarType val) { return *out = 
val;}
+    static VectorType multiply(VectorType val, VectorType scale) { return val 
* scale; }
+    static VectorType divide(VectorType val, VectorType scale) { return val / 
scale; }
+    template <RoundingMode mode>
+    static VectorType apply(VectorType val)
+    {
+        return BFloat16(std::roundf(static_cast<Float32>(val)));
+    }
+
+    static VectorType prepare(size_t scale)
+    {
+        return load1(BFloat16(static_cast<Float32>(scale)));
+    }
+};
+
 
 /** Implementation of low-level round-off functions for floating-point values.
   */
@@ -167,7 +192,7 @@ private:
 
     template <Vectorize vectorize =
 #ifdef __SSE4_1__
-    Vectorize::Yes
+    std::is_same_v<T, BFloat16> ? Vectorize::No : Vectorize::Yes
 #else
     Vectorize::No
 #endif
@@ -219,7 +244,7 @@ struct DispatcherRoundingHalfUp
 {
     template <ScaleMode scale_mode>
     using FunctionRoundingImpl = std::conditional_t<
-        std::is_floating_point_v<T>,
+        std::is_floating_point_v<T> || std::is_same_v<T, BFloat16>,
         FloatRoundingHalfUpImpl<T, rounding_mode, scale_mode>,
         IntegerRoundingImpl<T, rounding_mode, scale_mode, tie_breaking_mode>>;
 
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
index adf25d13f2..63dc3c3457 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
@@ -44,7 +44,7 @@ GraceAggregatingTransform::GraceAggregatingTransform(
     , aggregate_columns(params_->params.aggregates_size)
     , no_pre_aggregated(no_pre_aggregated_)
     , final_output(final_output_)
-    , 
tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
+    , tmp_data_disk(context_->getTempDataOnDisk())
 {
     output_header = params->getHeader();
     auto config = GraceMergingAggregateConfig::loadFromContext(context);
@@ -302,10 +302,13 @@ void GraceAggregatingTransform::flushBuckets()
         flushBucket(i);
 }
 
-static size_t flushBlocksInfoDisk(DB::TemporaryFileStream * file_stream, 
std::list<DB::Block> & blocks)
+static size_t 
flushBlocksInfoDisk(std::optional<DB::TemporaryBlockStreamHolder>& file_stream, 
std::list<DB::Block> & blocks)
 {
     size_t flush_bytes = 0;
     DB::Blocks tmp_blocks;
+    if (!file_stream)
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "file_stream is 
empty");
+    auto & tmp_stream = file_stream.value();
     while (!blocks.empty())
     {
         while (!blocks.empty())
@@ -322,11 +325,11 @@ static size_t flushBlocksInfoDisk(DB::TemporaryFileStream 
* file_stream, std::li
         flush_bytes += merged_block.bytes();
         if (merged_block.rows())
         {
-            file_stream->write(merged_block);
+            tmp_stream->write(merged_block);
         }
     }
     if (flush_bytes)
-        file_stream->flush();
+        tmp_stream->flush();
     return flush_bytes;
 }
 
@@ -338,7 +341,7 @@ size_t GraceAggregatingTransform::flushBucket(size_t 
bucket_index)
     if (!file_stream.original_blocks.empty())
     {
         if (!file_stream.original_file_stream)
-            file_stream.original_file_stream = 
&tmp_data_disk->createStream(header);
+            file_stream.original_file_stream = 
DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
         flush_bytes += flushBlocksInfoDisk(file_stream.original_file_stream, 
file_stream.original_blocks);
     }
     if (!file_stream.intermediate_blocks.empty())
@@ -346,7 +349,7 @@ size_t GraceAggregatingTransform::flushBucket(size_t 
bucket_index)
         if (!file_stream.intermediate_file_stream)
         {
             auto intermediate_header = params->aggregator.getHeader(false);
-            file_stream.intermediate_file_stream = 
&tmp_data_disk->createStream(intermediate_header);
+            file_stream.intermediate_file_stream = 
DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get());
         }
         flush_bytes += 
flushBlocksInfoDisk(file_stream.intermediate_file_stream, 
file_stream.intermediate_blocks);
     }
@@ -373,9 +376,10 @@ std::unique_ptr<AggregateDataBlockConverter> 
GraceAggregatingTransform::prepareB
     if (buffer_file_stream.intermediate_file_stream)
     {
         buffer_file_stream.intermediate_file_stream->finishWriting();
+        auto reader = 
buffer_file_stream.intermediate_file_stream->getReadStream();
         while (true)
         {
-            auto block = buffer_file_stream.intermediate_file_stream->read();
+            auto block = reader->read();
             if (!block.rows())
                 break;
             read_bytes += block.bytes();
@@ -383,7 +387,7 @@ std::unique_ptr<AggregateDataBlockConverter> 
GraceAggregatingTransform::prepareB
             mergeOneBlock(block, false);
             block = {};
         }
-        buffer_file_stream.intermediate_file_stream = nullptr;
+        buffer_file_stream.intermediate_file_stream.reset();
         total_read_disk_time += watch.elapsedMilliseconds();
     }
     if (!buffer_file_stream.intermediate_blocks.empty())
@@ -398,9 +402,10 @@ std::unique_ptr<AggregateDataBlockConverter> 
GraceAggregatingTransform::prepareB
     if (buffer_file_stream.original_file_stream)
     {
         buffer_file_stream.original_file_stream->finishWriting();
+        auto reader = buffer_file_stream.original_file_stream->getReadStream();
         while (true)
         {
-            auto block = buffer_file_stream.original_file_stream->read();
+            auto block = reader->read();
             if (!block.rows())
                 break;
             read_bytes += block.bytes();
@@ -408,7 +413,7 @@ std::unique_ptr<AggregateDataBlockConverter> 
GraceAggregatingTransform::prepareB
             mergeOneBlock(block, true);
             block = {};
         }
-        buffer_file_stream.original_file_stream = nullptr;
+        buffer_file_stream.original_file_stream.reset();
         total_read_disk_time += watch.elapsedMilliseconds();
     }
     if (!buffer_file_stream.original_blocks.empty())
diff --git a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h 
b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
index c2b787393a..612a58b3c9 100644
--- a/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
+++ b/cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
@@ -59,7 +59,7 @@ private:
     DB::Aggregator::AggregateColumns aggregate_columns;
     DB::AggregatingTransformParamsPtr params;
     DB::ContextPtr context;
-    DB::TemporaryDataOnDiskPtr tmp_data_disk;
+    DB::TemporaryDataOnDiskScopePtr tmp_data_disk;
     DB::AggregatedDataVariantsPtr current_data_variants = nullptr;
     size_t current_bucket_index = 0;
 
@@ -83,9 +83,9 @@ private:
         /// Only be used when there is no pre-aggregated step, store the 
original input blocks.
         std::list<DB::Block> original_blocks;
         /// store the intermediate result blocks.
-        DB::TemporaryFileStream * intermediate_file_stream = nullptr;
+        std::optional<DB::TemporaryBlockStreamHolder> intermediate_file_stream;
         /// Only be used when there is no pre-aggregated step
-        DB::TemporaryFileStream * original_file_stream = nullptr;
+        std::optional<DB::TemporaryBlockStreamHolder> original_file_stream;
         size_t pending_bytes = 0;
     };
     std::unordered_map<size_t, BufferFileStream> buckets;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
index 59d4c39f87..5a6f229744 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/CrossRelParser.cpp
@@ -41,6 +41,7 @@ namespace DB
 namespace Setting
 {
 extern const SettingsUInt64 max_block_size;
+extern const SettingsUInt64 min_joined_block_size_bytes;
 }
 namespace ErrorCodes
 {
@@ -200,7 +201,13 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     {
         JoinPtr hash_join = std::make_shared<HashJoin>(table_join, 
right->getCurrentHeader().cloneEmpty());
         QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
-            left->getCurrentHeader(), right->getCurrentHeader(), hash_join, 
context->getSettingsRef()[Setting::max_block_size], 1, false);
+            left->getCurrentHeader(),
+            right->getCurrentHeader(),
+            hash_join,
+            context->getSettingsRef()[Setting::max_block_size],
+            context->getSettingsRef()[Setting::min_joined_block_size_bytes],
+            1,
+            false);
         join_step->setStepDescription("CROSS_JOIN");
         steps.emplace_back(join_step.get());
         std::vector<QueryPlanPtr> plans;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
index 46f7926cf7..7493471697 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/JoinRelParser.cpp
@@ -46,6 +46,7 @@ namespace Setting
 {
 extern const SettingsJoinAlgorithm join_algorithm;
 extern const SettingsUInt64 max_block_size;
+extern const SettingsUInt64 min_joined_block_size_bytes;
 }
 namespace ErrorCodes
 {
@@ -315,7 +316,13 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
         JoinPtr smj_join = std::make_shared<FullSortingMergeJoin>(table_join, 
right->getCurrentHeader().cloneEmpty(), -1);
         MultiEnum<DB::JoinAlgorithm> join_algorithm = 
context->getSettingsRef()[Setting::join_algorithm];
         QueryPlanStepPtr join_step = std::make_unique<DB::JoinStep>(
-            left->getCurrentHeader(), right->getCurrentHeader(), smj_join, 
context->getSettingsRef()[Setting::max_block_size], 1, false);
+            left->getCurrentHeader(),
+            right->getCurrentHeader(),
+            smj_join,
+            context->getSettingsRef()[Setting::max_block_size],
+            context->getSettingsRef()[Setting::min_joined_block_size_bytes],
+            1,
+            false);
 
         join_step->setStepDescription("SORT_MERGE_JOIN");
         steps.emplace_back(join_step.get());
@@ -448,7 +455,7 @@ void JoinRelParser::collectJoinKeys(
     table_join.addDisjunct();
     const auto & expr = join_rel.expression();
     auto & join_clause = table_join.getClauses().back();
-    std::list<const const substrait::Expression *> expressions_stack;
+    std::list<const substrait::Expression *> expressions_stack;
     expressions_stack.push_back(&expr);
     while (!expressions_stack.empty())
     {
@@ -778,6 +785,7 @@ DB::QueryPlanPtr JoinRelParser::buildMultiOnClauseHashJoin(
         right_plan->getCurrentHeader(),
         hash_join,
         context->getSettingsRef()[Setting::max_block_size],
+        context->getSettingsRef()[Setting::min_joined_block_size_bytes],
         1,
         false);
     join_step->setStepDescription("Multi join on clause hash join");
@@ -817,6 +825,7 @@ DB::QueryPlanPtr JoinRelParser::buildSingleOnClauseHashJoin(
         right_plan->getCurrentHeader(),
         hash_join,
         context->getSettingsRef()[Setting::max_block_size],
+        context->getSettingsRef()[Setting::min_joined_block_size_bytes],
         1,
         false);
 
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 0be7e0d892..17587e5200 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -74,7 +74,7 @@ void 
SparkStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree & source
         sum_ranges += part.ranges.size();
         sum_marks += part.getMarksCount();
         sum_rows += part.getRowsCount();
-        total_marks_pk += 
part.data_part->index_granularity.getMarksCountWithoutFinal();
+        total_marks_pk += 
part.data_part->index_granularity->getMarksCountWithoutFinal();
 
         for (auto range : part.ranges)
             sum_marks_pk += range.getNumberOfMarks();
@@ -487,6 +487,12 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeDataWriter::writeTempPart(
     ///  either default lz4 or compression method with zero thresholds on 
absolute and relative part size.
     auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
     auto txn = context->getCurrentTransaction();
+    auto index_granularity_ptr = createMergeTreeIndexGranularity(
+        block.rows(),
+        block.bytes(),
+        *data.getSettings(),
+        new_data_part->index_granularity_info,
+        /*blocks_are_granules=*/false);
     auto out = std::make_unique<MergedBlockOutputStream>(
         new_data_part,
         metadata_snapshot,
@@ -494,6 +500,7 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeDataWriter::writeTempPart(
         indices,
         
MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns()),
         compression_codec,
+        index_granularity_ptr,
         txn ? txn->tid : Tx::PrehistoricTID,
         false,
         false,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to