Copilot commented on code in PR #68:
URL: https://github.com/apache/paimon-cpp/pull/68#discussion_r3385121632


##########
src/paimon/core/stats/simple_stats_evolutions.h:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/utils/concurrent_hash_map.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/stats/simple_stats_evolution.h"

Review Comment:
   `SimpleStatsEvolutions` uses `std::nullopt` / `std::optional` but this 
header does not include `<optional>`. Relying on transitive includes can cause 
build breaks depending on include order; add `#include <optional>` here.



##########
src/paimon/core/stats/simple_stats_evolutions.h:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/utils/concurrent_hash_map.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/stats/simple_stats_evolution.h"
+
+namespace paimon {
+class SimpleStatsEvolutions {
+ public:
+    SimpleStatsEvolutions(const std::shared_ptr<TableSchema>& table_schema,
+                          const std::shared_ptr<MemoryPool>& pool)
+        : pool_(pool), table_schema_(table_schema) {}
+
+    std::shared_ptr<SimpleStatsEvolution> GetOrCreate(
+        const std::shared_ptr<TableSchema>& data_schema) {
+        auto data_schema_id = data_schema->Id();
+        auto cached_evolution = evolutions_.Find(data_schema_id);
+        if (cached_evolution != std::nullopt) {
+            return cached_evolution.value();
+        }
+        bool need_mapping = data_schema_id != table_schema_->Id();
+        auto evolution = std::make_shared<SimpleStatsEvolution>(
+            data_schema->Fields(), table_schema_->Fields(), need_mapping, 
pool_);
+        evolutions_.Insert(data_schema_id, evolution);
+        return evolution;
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<TableSchema> table_schema_;
+    // scheme_id -> evolution
+    ConcurrentHashMap<int64_t, std::shared_ptr<SimpleStatsEvolution>> 
evolutions_;

Review Comment:
   Typo in comment: `scheme_id` should be `schema_id`.



##########
src/paimon/core/stats/simple_stats_evolutions.h:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/utils/concurrent_hash_map.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/stats/simple_stats_evolution.h"
+
+namespace paimon {
+class SimpleStatsEvolutions {
+ public:
+    SimpleStatsEvolutions(const std::shared_ptr<TableSchema>& table_schema,
+                          const std::shared_ptr<MemoryPool>& pool)
+        : pool_(pool), table_schema_(table_schema) {}
+
+    std::shared_ptr<SimpleStatsEvolution> GetOrCreate(
+        const std::shared_ptr<TableSchema>& data_schema) {
+        auto data_schema_id = data_schema->Id();
+        auto cached_evolution = evolutions_.Find(data_schema_id);
+        if (cached_evolution != std::nullopt) {
+            return cached_evolution.value();
+        }
+        bool need_mapping = data_schema_id != table_schema_->Id();
+        auto evolution = std::make_shared<SimpleStatsEvolution>(
+            data_schema->Fields(), table_schema_->Fields(), need_mapping, 
pool_);
+        evolutions_.Insert(data_schema_id, evolution);
+        return evolution;
+    }

Review Comment:
   `GetOrCreate` performs a non-atomic `Find` followed by `Insert`. With 
concurrent callers, this can create multiple `SimpleStatsEvolution` instances 
for the same `data_schema_id` and insert races (wasted work/memory and possibly 
returning different shared_ptrs to different threads). Prefer an atomic 
insert-if-absent / compute-if-absent API on `ConcurrentHashMap` if available 
(e.g., `GetOrInsert`, `InsertIfAbsent` returning existing), or guard this 
critical section with a mutex.



##########
src/paimon/core/stats/simple_stats_evolution.cpp:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/stats/simple_stats_evolution.h"
+
+#include <cassert>
+#include <cstddef>
+#include <string_view>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/projected_array.h"
+#include "paimon/common/utils/projected_row.h"
+#include "paimon/core/casting/casted_row.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/core/utils/field_mapping.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class CastExecutor;
+class InternalMap;
+class InternalRow;
+class MemoryPool;
+
+SimpleStatsEvolution::SimpleStatsEvolution(const std::vector<DataField>& 
data_fields,
+                                           const std::vector<DataField>& 
table_fields,
+                                           bool need_mapping,
+                                           const std::shared_ptr<MemoryPool>& 
pool)
+    : need_mapping_(need_mapping), data_fields_(data_fields), 
table_fields_(table_fields) {
+    empty_values_ = std::make_shared<GenericRow>(data_fields.size());
+    empty_null_counts_ = std::make_shared<BinaryArray>();
+    BinaryArrayWriter array_writer(empty_null_counts_.get(), 
data_fields.size(),
+                                   /*element_size=*/sizeof(int64_t), 
pool.get());
+    for (size_t i = 0; i < data_fields.size(); ++i) {
+        array_writer.SetNullAt(i);
+    }
+    array_writer.Complete();
+
+    for (size_t i = 0; i < data_fields.size(); i++) {
+        const auto& data_field = data_fields[i];
+        id_to_data_fields_.emplace(data_field.Id(), std::make_pair(i, 
data_field));
+    }
+    std::map<std::string, DataField> name_to_table_fields;
+    for (const auto& field : table_fields) {
+        name_to_table_fields_.emplace(field.Name(), field);
+    }

Review Comment:
   The local variable `name_to_table_fields` is never used. This will trigger 
an unused-variable warning (and may fail the build if warnings are treated as 
errors). Remove the unused local variable and keep populating 
`name_to_table_fields_` directly.



##########
src/paimon/core/stats/simple_stats_evolution.cpp:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/stats/simple_stats_evolution.h"
+
+#include <cassert>
+#include <cstddef>
+#include <string_view>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/common/utils/projected_array.h"
+#include "paimon/common/utils/projected_row.h"
+#include "paimon/core/casting/casted_row.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/core/utils/field_mapping.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class CastExecutor;
+class InternalMap;
+class InternalRow;
+class MemoryPool;
+
+SimpleStatsEvolution::SimpleStatsEvolution(const std::vector<DataField>& 
data_fields,
+                                           const std::vector<DataField>& 
table_fields,
+                                           bool need_mapping,
+                                           const std::shared_ptr<MemoryPool>& 
pool)
+    : need_mapping_(need_mapping), data_fields_(data_fields), 
table_fields_(table_fields) {
+    empty_values_ = std::make_shared<GenericRow>(data_fields.size());
+    empty_null_counts_ = std::make_shared<BinaryArray>();
+    BinaryArrayWriter array_writer(empty_null_counts_.get(), 
data_fields.size(),
+                                   /*element_size=*/sizeof(int64_t), 
pool.get());
+    for (size_t i = 0; i < data_fields.size(); ++i) {
+        array_writer.SetNullAt(i);
+    }
+    array_writer.Complete();
+
+    for (size_t i = 0; i < data_fields.size(); i++) {
+        const auto& data_field = data_fields[i];
+        id_to_data_fields_.emplace(data_field.Id(), std::make_pair(i, 
data_field));
+    }
+    std::map<std::string, DataField> name_to_table_fields;
+    for (const auto& field : table_fields) {
+        name_to_table_fields_.emplace(field.Name(), field);
+    }
+}
+
+class NullCountsEvoArray : public InternalArray {
+ public:
+    NullCountsEvoArray(const std::shared_ptr<InternalArray>& array,
+                       const std::vector<int32_t>& mapping, int64_t 
not_found_value)
+        : array_(array), mapping_(mapping), not_found_value_(not_found_value) {
+        assert(array_);
+    }
+    int32_t Size() const override {
+        return mapping_.size();
+    }
+    bool IsNullAt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        if (mapping_[pos] < 0) {
+            return false;
+        }
+        return array_->IsNullAt(mapping_[pos]);
+    }
+    int64_t GetLong(int32_t pos) const override {
+        if (mapping_[pos] < 0) {
+            return not_found_value_;
+        }
+        return array_->GetLong(mapping_[pos]);
+    }
+
+ private:
+    // ============================= Unsupported Methods 
================================
+    bool GetBoolean(int32_t pos) const override;
+    char GetByte(int32_t pos) const override;
+    int16_t GetShort(int32_t pos) const override;
+    int32_t GetInt(int32_t pos) const override;
+    int32_t GetDate(int32_t pos) const override;
+    float GetFloat(int32_t pos) const override;
+    double GetDouble(int32_t pos) const override;
+    BinaryString GetString(int32_t pos) const override;
+    std::string_view GetStringView(int32_t pos) const override;
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override;
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override;
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override;
+    Result<std::vector<char>> ToBooleanArray() const override;
+    Result<std::vector<char>> ToByteArray() const override;
+    Result<std::vector<int16_t>> ToShortArray() const override;
+    Result<std::vector<int32_t>> ToIntArray() const override;
+    Result<std::vector<int64_t>> ToLongArray() const override;
+    Result<std::vector<float>> ToFloatArray() const override;
+    Result<std::vector<double>> ToDoubleArray() const override;
+
+ private:
+    std::shared_ptr<InternalArray> array_;
+    std::vector<int32_t> mapping_;
+    int64_t not_found_value_;
+};
+
+Result<SimpleStatsEvolution::EvolutionStats> SimpleStatsEvolution::Evolution(
+    const SimpleStats& stats, int64_t row_count,
+    const std::optional<std::vector<std::string>>& dense_fields) {
+    SimpleStatsEvolution::EvolutionStats evolution_stats(
+        std::make_shared<BinaryRow>(stats.MinValues()),
+        std::make_shared<BinaryRow>(stats.MaxValues()),
+        std::make_shared<BinaryArray>(stats.NullCounts()));
+    auto& min_values = evolution_stats.min_values;
+    auto& max_values = evolution_stats.max_values;
+    auto& null_counts = evolution_stats.null_counts;
+    if (dense_fields != std::nullopt && dense_fields.value().empty()) {
+        // optimize for empty dense fields
+        min_values = empty_values_;
+        max_values = empty_values_;
+        null_counts = empty_null_counts_;
+    } else if (dense_fields != std::nullopt) {
+        // create dense index mapping
+        std::vector<int32_t> data_idx_to_dense_idx;
+        std::optional<std::vector<int32_t>> cached_dense_idx =
+            dense_fields_mapping_.Find(dense_fields.value());
+        if (!cached_dense_idx) {
+            std::map<std::string, int32_t> field_name_to_dense_idx =
+                ObjectUtils::CreateIdentifierToIndexMap(dense_fields.value());
+            data_idx_to_dense_idx.resize(data_fields_.size(), -1);
+            for (size_t i = 0; i < data_fields_.size(); ++i) {
+                auto iter = 
field_name_to_dense_idx.find(data_fields_[i].Name());
+                if (iter != field_name_to_dense_idx.end()) {
+                    data_idx_to_dense_idx[i] = iter->second;
+                }
+            }
+            dense_fields_mapping_.Insert(dense_fields.value(), 
data_idx_to_dense_idx);
+        } else {
+            data_idx_to_dense_idx = std::move(cached_dense_idx).value();
+        }
+
+        min_values = std::make_shared<ProjectedRow>(min_values, 
data_idx_to_dense_idx);
+        max_values = std::make_shared<ProjectedRow>(max_values, 
data_idx_to_dense_idx);
+        null_counts = std::make_shared<ProjectedArray>(null_counts, 
data_idx_to_dense_idx);
+    }
+
+    if (!need_mapping_) {
+        return evolution_stats;
+    }
+    return MappingStats(row_count, evolution_stats);
+}
+
+Result<SimpleStatsEvolution::EvolutionStats> 
SimpleStatsEvolution::MappingStats(
+    int64_t row_count, const SimpleStatsEvolution::EvolutionStats& src_stats) 
const {
+    SimpleStatsEvolution::EvolutionStats evolution_stats = src_stats;
+    auto& min_values = evolution_stats.min_values;
+    auto& max_values = evolution_stats.max_values;
+    auto& null_counts = evolution_stats.null_counts;
+
+    std::vector<int32_t> table_idx_to_data_idx(table_fields_.size(), -1);
+    std::vector<DataField> projected_data_fields;
+    projected_data_fields.reserve(table_fields_.size());
+    std::map<int32_t, int32_t> field_id_to_data_idx = 
ObjectUtils::CreateIdentifierToIndexMap(
+        data_fields_, [](const DataField& field) -> int32_t { return 
field.Id(); });
+    for (size_t i = 0; i < table_fields_.size(); ++i) {
+        auto iter = field_id_to_data_idx.find(table_fields_[i].Id());
+        if (iter != field_id_to_data_idx.end()) {
+            table_idx_to_data_idx[i] = iter->second;
+            projected_data_fields.push_back(data_fields_[iter->second]);
+        } else {
+            projected_data_fields.push_back(table_fields_[i]);
+        }
+    }
+    min_values = std::make_shared<ProjectedRow>(min_values, 
table_idx_to_data_idx);
+    max_values = std::make_shared<ProjectedRow>(max_values, 
table_idx_to_data_idx);
+    null_counts =
+        std::make_shared<NullCountsEvoArray>(null_counts, 
table_idx_to_data_idx, row_count);
+
+    // cast mapping
+    PAIMON_ASSIGN_OR_RAISE(
+        std::vector<std::shared_ptr<CastExecutor>> cast_executors,
+        FieldMappingBuilder::CreateDataCastExecutors(table_fields_, 
projected_data_fields));
+
+    bool need_cast = false;
+    for (const auto& executor : cast_executors) {
+        if (executor) {
+            need_cast = true;
+            break;
+        }
+    }
+    if (need_cast) {
+        PAIMON_ASSIGN_OR_RAISE(min_values, CastedRow::Create(cast_executors, 
projected_data_fields,
+                                                             table_fields_, 
min_values));
+        PAIMON_ASSIGN_OR_RAISE(max_values, CastedRow::Create(cast_executors, 
projected_data_fields,
+                                                             table_fields_, 
max_values));
+    }
+    return evolution_stats;
+}
+
+// ============================= Unsupported Methods 
================================
+bool NullCountsEvoArray::GetBoolean(int32_t pos) const {
+    assert(false);
+    return array_->GetBoolean(mapping_[pos]);
+}

Review Comment:
   The 'unsupported' getters rely on `assert(false)` and then still execute a 
return statement. In release builds (where `assert` is compiled out), these 
methods will silently return values, potentially using invalid indices (e.g., 
`mapping_[pos] < 0`) and producing incorrect behavior that’s hard to diagnose. 
Consider making these paths fail-fast unconditionally (e.g., `std::terminate()` 
/ a project `UNREACHABLE` macro) or return a safe default after a hard failure 
mechanism, and ensure `mapping_[pos]` is validated before use.



##########
src/paimon/core/stats/simple_stats_converter.cpp:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/stats/simple_stats_converter.h"
+
+#include <cstdint>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+Result<SimpleStats> SimpleStatsConverter::ToBinary(
+    const std::vector<std::shared_ptr<ColumnStats>>& stats_vec, MemoryPool* 
pool) {
+    int32_t row_field_count = stats_vec.size();
+    BinaryRow min_values(row_field_count);
+    BinaryRowWriter min_writer(&min_values, 1024, pool);
+    BinaryRow max_values(row_field_count);
+    BinaryRowWriter max_writer(&max_values, 1024, pool);
+    BinaryArray null_counts;
+    BinaryArrayWriter null_writer(&null_counts, row_field_count, 
sizeof(int64_t), pool);
+    for (int32_t i = 0; i < row_field_count; i++) {
+        const auto& stats = stats_vec[i];
+        auto type = stats->GetFieldType();
+        if (stats->NullCount()) {
+            null_writer.WriteLong(i, stats->NullCount().value());
+        } else {
+            null_writer.SetNullAt(i);
+        }
+        if (auto null_stats = 
std::dynamic_pointer_cast<NestedColumnStats>(stats)) {
+            // nested type, e.g., List, Map, Struct
+            min_writer.SetNullAt(i);
+            max_writer.SetNullAt(i);
+            continue;
+        }
+        switch (type) {
+            case FieldType::BOOLEAN: {
+                auto typed_stats = 
std::dynamic_pointer_cast<BooleanColumnStats>(stats);
+                if (typed_stats == nullptr) {
+                    return Status::Invalid("cast BooleanColumnStats failed");
+                }
+                if (typed_stats->Min() == std::nullopt) {
+                    min_writer.SetNullAt(i);
+                } else {
+                    min_writer.WriteBoolean(i, typed_stats->Min().value());
+                }
+                if (typed_stats->Max() == std::nullopt) {
+                    max_writer.SetNullAt(i);
+                } else {
+                    max_writer.WriteBoolean(i, typed_stats->Max().value());
+                }
+                break;
+            }
+            case FieldType::TINYINT: {
+                auto typed_stats = 
std::dynamic_pointer_cast<TinyIntColumnStats>(stats);
+                if (typed_stats == nullptr) {
+                    return Status::Invalid("cast TinyIntColumnStats failed");
+                }
+                if (typed_stats->Min() == std::nullopt) {
+                    min_writer.SetNullAt(i);
+                } else {
+                    min_writer.WriteByte(i, typed_stats->Min().value());
+                }
+                if (typed_stats->Max() == std::nullopt) {
+                    max_writer.SetNullAt(i);
+                } else {
+                    max_writer.WriteByte(i, typed_stats->Max().value());
+                }
+                break;
+            }
+            case FieldType::SMALLINT: {
+                auto typed_stats = 
std::dynamic_pointer_cast<SmallIntColumnStats>(stats);
+                if (typed_stats == nullptr) {
+                    return Status::Invalid("cast SmallIntColumnStats failed");
+                }
+                if (typed_stats->Min() == std::nullopt) {
+                    min_writer.SetNullAt(i);
+                } else {
+                    min_writer.WriteShort(i, typed_stats->Min().value());
+                }
+                if (typed_stats->Max() == std::nullopt) {
+                    max_writer.SetNullAt(i);
+                } else {
+                    max_writer.WriteShort(i, typed_stats->Max().value());
+                }
+                break;
+            }
+            case FieldType::INT: {
+                auto typed_stats = 
std::dynamic_pointer_cast<IntColumnStats>(stats);
+                if (typed_stats == nullptr) {
+                    return Status::Invalid("cast IntColumnStats failed");
+                }
+                if (typed_stats->Min() == std::nullopt) {
+                    min_writer.SetNullAt(i);
+                } else {
+                    min_writer.WriteInt(i, typed_stats->Min().value());
+                }
+                if (typed_stats->Max() == std::nullopt) {
+                    max_writer.SetNullAt(i);
+                } else {
+                    max_writer.WriteInt(i, typed_stats->Max().value());
+                }
+                break;
+            }
+            case FieldType::BIGINT: {
+                auto typed_stats = 
std::dynamic_pointer_cast<BigIntColumnStats>(stats);
+                if (typed_stats == nullptr) {
+                    return Status::Invalid("cast LongColumnStats failed");
+                }

Review Comment:
   The error message is inconsistent with the attempted cast 
(`BigIntColumnStats`) and will be misleading during debugging. Update it to 
something like `cast BigIntColumnStats failed` (and consider including the 
column index/type to make failures actionable).



##########
src/paimon/core/stats/simple_stats_converter.cpp:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/core/stats/simple_stats_converter.h"
+
+#include <cstdint>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/core/stats/simple_stats.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/format/column_stats.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+Result<SimpleStats> SimpleStatsConverter::ToBinary(
+    const std::vector<std::shared_ptr<ColumnStats>>& stats_vec, MemoryPool* 
pool) {
+    int32_t row_field_count = stats_vec.size();
+    BinaryRow min_values(row_field_count);
+    BinaryRowWriter min_writer(&min_values, 1024, pool);
+    BinaryRow max_values(row_field_count);
+    BinaryRowWriter max_writer(&max_values, 1024, pool);
+    BinaryArray null_counts;
+    BinaryArrayWriter null_writer(&null_counts, row_field_count, 
sizeof(int64_t), pool);
+    for (int32_t i = 0; i < row_field_count; i++) {
+        const auto& stats = stats_vec[i];
+        auto type = stats->GetFieldType();

Review Comment:
   `stats` is dereferenced without a null check. If `stats_vec` can contain 
null entries (e.g., partially initialized collectors or upstream errors), this 
will crash. Add an early validation inside the loop (or pre-validate the 
vector) and return `Status::Invalid(...)` when an element is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to