wgtmac commented on code in PR #335: URL: https://github.com/apache/iceberg-cpp/pull/335#discussion_r2559189487
########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); + return term_ ? term_->reference() : nullptr; + } + + Result<Literal> Evaluate(const StructLike& data) const override = 0; + + bool is_bound_aggregate() const override { return true; } + + enum class Kind : int8_t { + // Count aggregates (COUNT, COUNT_STAR, COUNT_NULL) + kCount = 0, + // Value aggregates (MIN, MAX) + kValue, + }; + + virtual Kind kind() const = 0; + virtual Result<std::unique_ptr<Aggregator>> NewAggregator() const = 0; + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate<BoundTerm>(op, std::move(term)) {} +}; + +/// \brief Base class for COUNT aggregates. +class ICEBERG_EXPORT CountAggregate : public BoundAggregate { + public: + Kind kind() const override { return Kind::kCount; } + + protected: + CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} +}; + +/// \brief COUNT(term) aggregate. +class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNonNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT_NULL(term) aggregate. +class ICEBERG_EXPORT CountNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT(*) aggregate. +class ICEBERG_EXPORT CountStarAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountStarAggregate>> Make(); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + CountStarAggregate(); +}; + +/// \brief Bound MAX/MIN aggregate. +class ICEBERG_EXPORT ValueAggregate : public BoundAggregate { + public: + ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term); + + Kind kind() const override { return Kind::kValue; } + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; +}; + +/// \brief Evaluates bound aggregates over StructLike rows. +class ICEBERG_EXPORT AggregateEvaluator { + public: + virtual ~AggregateEvaluator() = default; + + /// \brief Create an evaluator for a single bound aggregate. + /// \param aggregate The bound aggregate to evaluate across rows. + static Result<std::unique_ptr<AggregateEvaluator>> Make( + std::shared_ptr<BoundAggregate> aggregate); + + /// \brief Create an evaluator for multiple bound aggregates. + /// \param aggregates Aggregates to evaluate in one pass; order is preserved in + /// Results(). + static Result<std::unique_ptr<AggregateEvaluator>> MakeList( + std::vector<std::shared_ptr<BoundAggregate>> aggregates); + + /// \brief Update aggregates with a row. + virtual Status Update(const StructLike& row) = 0; + + /// \brief Final aggregated value. + virtual Result<std::vector<Literal>> Results() const = 0; Review Comment: Should we return `Result<std::span<const Literal>>`? ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNonNullAggregate>( + new CountNonNullAggregate(std::move(term))); +} + +std::string CountNonNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count({})", term()->reference()->name()); +} + +Result<Literal> CountNonNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 0 : 1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNonNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNonNullAggregator(term())); +} + +CountNullAggregate::CountNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCountNull, std::move(term)) {} + +Result<std::shared_ptr<CountNullAggregate>> CountNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNullAggregate>(new CountNullAggregate(std::move(term))); +} + +std::string CountNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count_null({})", term()->reference()->name()); +} + +Result<Literal> CountNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 1 : 0); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNullAggregator(term())); +} + +CountStarAggregate::CountStarAggregate() + : CountAggregate(Expression::Operation::kCountStar, nullptr) {} + +Result<std::shared_ptr<CountStarAggregate>> CountStarAggregate::Make() { + return std::shared_ptr<CountStarAggregate>(new CountStarAggregate()); +} + +std::string CountStarAggregate::ToString() const { return "count(*)"; } + +Result<Literal> CountStarAggregate::Evaluate(const StructLike& data) const { + return Literal::Long(1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountStarAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountStarAggregator()); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string ValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + auto prefix = op() == Expression::Operation::kMax ? "max" : "min"; + return std::format("{}({})", prefix, term()->reference()->name()); +} + +Result<Literal> ValueAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return literal; +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> ValueAggregate::NewAggregator() + const { + bool is_max = op() == Expression::Operation::kMax; + return std::unique_ptr<BoundAggregate::Aggregator>( + new ValueAggregatorImpl(is_max, term())); +} + +// -------------------- Unbound binding -------------------- + +template <typename B> +Result<std::shared_ptr<Expression>> UnboundAggregateImpl<B>::Bind( + const Schema& schema, bool case_sensitive) const { + ICEBERG_DCHECK(UnboundAggregateImpl<B>::IsSupportedOp(this->op()), + "Unexpected aggregate operation"); + + std::shared_ptr<B> bound_term; + if (this->term()) { + ICEBERG_ASSIGN_OR_THROW(bound_term, this->term()->Bind(schema, case_sensitive)); + } + + switch (this->op()) { + case Expression::Operation::kCountStar: { + ICEBERG_ASSIGN_OR_THROW(auto aggregate, CountStarAggregate::Make()); + return aggregate; + } + case Expression::Operation::kCount: { + if (!bound_term) { + return InvalidExpression("Aggregate requires a term"); + } + ICEBERG_ASSIGN_OR_THROW(auto aggregate, + CountNonNullAggregate::Make(std::move(bound_term))); + return aggregate; + } + case Expression::Operation::kCountNull: { + if (!bound_term) { + return InvalidExpression("Aggregate requires a term"); + } + ICEBERG_ASSIGN_OR_THROW(auto aggregate, + CountNullAggregate::Make(std::move(bound_term))); + return aggregate; + } + case Expression::Operation::kMax: + case Expression::Operation::kMin: { + if (!bound_term) { + return InvalidExpression("Aggregate requires a term"); + } + auto aggregate = + std::make_shared<ValueAggregate>(this->op(), std::move(bound_term)); + return aggregate; + } + default: + return NotSupported("Unsupported aggregate operation"); + } +} + +template <typename B> +Result<std::shared_ptr<UnboundAggregateImpl<B>>> UnboundAggregateImpl<B>::Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) { + if (!IsSupportedOp(op)) { + return NotSupported("Unsupported aggregate operation: {}", ::iceberg::ToString(op)); + } + if (op != Expression::Operation::kCountStar && !term) { + return InvalidExpression("Aggregate term cannot be null unless COUNT(*)"); + } + + return std::shared_ptr<UnboundAggregateImpl<B>>( + new UnboundAggregateImpl<B>(op, std::move(term))); +} + +template <typename B> +std::string UnboundAggregateImpl<B>::ToString() const { Review Comment: Can we move this to `Aggregate::ToString()` so all other overrides can be removed. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNonNullAggregate>( + new CountNonNullAggregate(std::move(term))); +} + +std::string CountNonNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count({})", term()->reference()->name()); +} + +Result<Literal> CountNonNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 0 : 1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNonNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNonNullAggregator(term())); +} + +CountNullAggregate::CountNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCountNull, std::move(term)) {} + +Result<std::shared_ptr<CountNullAggregate>> CountNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNullAggregate>(new CountNullAggregate(std::move(term))); +} + +std::string CountNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count_null({})", term()->reference()->name()); +} + +Result<Literal> CountNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 1 : 0); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNullAggregator(term())); +} + +CountStarAggregate::CountStarAggregate() + : CountAggregate(Expression::Operation::kCountStar, nullptr) {} + +Result<std::shared_ptr<CountStarAggregate>> CountStarAggregate::Make() { + return std::shared_ptr<CountStarAggregate>(new CountStarAggregate()); +} + +std::string CountStarAggregate::ToString() const { return "count(*)"; } + +Result<Literal> CountStarAggregate::Evaluate(const StructLike& data) const { + return Literal::Long(1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountStarAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountStarAggregator()); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string ValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + auto prefix = op() == Expression::Operation::kMax ? "max" : "min"; + return std::format("{}({})", prefix, term()->reference()->name()); +} + +Result<Literal> ValueAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return literal; +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> ValueAggregate::NewAggregator() + const { + bool is_max = op() == Expression::Operation::kMax; + return std::unique_ptr<BoundAggregate::Aggregator>( + new ValueAggregatorImpl(is_max, term())); +} + +// -------------------- Unbound binding -------------------- + +template <typename B> +Result<std::shared_ptr<Expression>> UnboundAggregateImpl<B>::Bind( + const Schema& schema, bool case_sensitive) const { + ICEBERG_DCHECK(UnboundAggregateImpl<B>::IsSupportedOp(this->op()), + "Unexpected aggregate operation"); + + std::shared_ptr<B> bound_term; + if (this->term()) { + ICEBERG_ASSIGN_OR_THROW(bound_term, this->term()->Bind(schema, case_sensitive)); + } + + switch (this->op()) { + case Expression::Operation::kCountStar: { + ICEBERG_ASSIGN_OR_THROW(auto aggregate, CountStarAggregate::Make()); + return aggregate; + } Review Comment: ```suggestion case Expression::Operation::kCountStar: return CountStarAggregate::Make(); ``` Same for below. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNonNullAggregate>( + new CountNonNullAggregate(std::move(term))); +} + +std::string CountNonNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count({})", term()->reference()->name()); +} + +Result<Literal> CountNonNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 0 : 1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNonNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNonNullAggregator(term())); +} + +CountNullAggregate::CountNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCountNull, std::move(term)) {} + +Result<std::shared_ptr<CountNullAggregate>> CountNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNullAggregate>(new CountNullAggregate(std::move(term))); +} + +std::string CountNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count_null({})", term()->reference()->name()); +} + +Result<Literal> CountNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 1 : 0); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNullAggregator(term())); +} + +CountStarAggregate::CountStarAggregate() + : CountAggregate(Expression::Operation::kCountStar, nullptr) {} + +Result<std::shared_ptr<CountStarAggregate>> CountStarAggregate::Make() { + return std::shared_ptr<CountStarAggregate>(new CountStarAggregate()); +} + +std::string CountStarAggregate::ToString() const { return "count(*)"; } + +Result<Literal> CountStarAggregate::Evaluate(const StructLike& data) const { + return Literal::Long(1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountStarAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountStarAggregator()); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string ValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + auto prefix = op() == Expression::Operation::kMax ? "max" : "min"; + return std::format("{}({})", prefix, term()->reference()->name()); +} + +Result<Literal> ValueAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return literal; +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> ValueAggregate::NewAggregator() + const { + bool is_max = op() == Expression::Operation::kMax; + return std::unique_ptr<BoundAggregate::Aggregator>( + new ValueAggregatorImpl(is_max, term())); +} + +// -------------------- Unbound binding -------------------- + +template <typename B> +Result<std::shared_ptr<Expression>> UnboundAggregateImpl<B>::Bind( + const Schema& schema, bool case_sensitive) const { + ICEBERG_DCHECK(UnboundAggregateImpl<B>::IsSupportedOp(this->op()), + "Unexpected aggregate operation"); + + std::shared_ptr<B> bound_term; + if (this->term()) { + ICEBERG_ASSIGN_OR_THROW(bound_term, this->term()->Bind(schema, case_sensitive)); Review Comment: We cannot throw in this function. Please use `ICEBERG_ASSIGN_OR_RAISE` for this. Same for below. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNonNullAggregate>( + new CountNonNullAggregate(std::move(term))); +} + +std::string CountNonNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count({})", term()->reference()->name()); +} + +Result<Literal> CountNonNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 0 : 1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNonNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNonNullAggregator(term())); +} + +CountNullAggregate::CountNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCountNull, std::move(term)) {} + +Result<std::shared_ptr<CountNullAggregate>> CountNullAggregate::Make( + std::shared_ptr<BoundTerm> term) { + if (!term) { + return InvalidExpression("Bound count aggregate requires non-null term"); + } + return std::shared_ptr<CountNullAggregate>(new CountNullAggregate(std::move(term))); +} + +std::string CountNullAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound count aggregate should have term"); + return std::format("count_null({})", term()->reference()->name()); +} + +Result<Literal> CountNullAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return Literal::Long(literal.IsNull() ? 1 : 0); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountNullAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountNullAggregator(term())); +} + +CountStarAggregate::CountStarAggregate() + : CountAggregate(Expression::Operation::kCountStar, nullptr) {} + +Result<std::shared_ptr<CountStarAggregate>> CountStarAggregate::Make() { + return std::shared_ptr<CountStarAggregate>(new CountStarAggregate()); +} + +std::string CountStarAggregate::ToString() const { return "count(*)"; } + +Result<Literal> CountStarAggregate::Evaluate(const StructLike& data) const { + return Literal::Long(1); +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> CountStarAggregate::NewAggregator() + const { + return std::unique_ptr<BoundAggregate::Aggregator>(new CountStarAggregator()); +} + +ValueAggregate::ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} + +std::string ValueAggregate::ToString() const { + ICEBERG_DCHECK(term() != nullptr, "Bound value aggregate should have term"); + auto prefix = op() == Expression::Operation::kMax ? "max" : "min"; + return std::format("{}({})", prefix, term()->reference()->name()); +} + +Result<Literal> ValueAggregate::Evaluate(const StructLike& data) const { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term()->Evaluate(data)); + return literal; +} + +Result<std::unique_ptr<BoundAggregate::Aggregator>> ValueAggregate::NewAggregator() + const { + bool is_max = op() == Expression::Operation::kMax; + return std::unique_ptr<BoundAggregate::Aggregator>( + new ValueAggregatorImpl(is_max, term())); +} + +// -------------------- Unbound binding -------------------- + +template <typename B> +Result<std::shared_ptr<Expression>> UnboundAggregateImpl<B>::Bind( + const Schema& schema, bool case_sensitive) const { + ICEBERG_DCHECK(UnboundAggregateImpl<B>::IsSupportedOp(this->op()), + "Unexpected aggregate operation"); + + std::shared_ptr<B> bound_term; + if (this->term()) { + ICEBERG_ASSIGN_OR_THROW(bound_term, this->term()->Bind(schema, case_sensitive)); + } + + switch (this->op()) { + case Expression::Operation::kCountStar: { + ICEBERG_ASSIGN_OR_THROW(auto aggregate, CountStarAggregate::Make()); + return aggregate; + } + case Expression::Operation::kCount: { + if (!bound_term) { Review Comment: Shouldn't this be checked by CountNonNullAggregate::Make? ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); + return term_ ? term_->reference() : nullptr; + } + + Result<Literal> Evaluate(const StructLike& data) const override = 0; + + bool is_bound_aggregate() const override { return true; } + + enum class Kind : int8_t { + // Count aggregates (COUNT, COUNT_STAR, COUNT_NULL) + kCount = 0, + // Value aggregates (MIN, MAX) + kValue, + }; + + virtual Kind kind() const = 0; + virtual Result<std::unique_ptr<Aggregator>> NewAggregator() const = 0; + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate<BoundTerm>(op, std::move(term)) {} +}; + +/// \brief Base class for COUNT aggregates. +class ICEBERG_EXPORT CountAggregate : public BoundAggregate { + public: + Kind kind() const override { return Kind::kCount; } + + protected: + CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} +}; + +/// \brief COUNT(term) aggregate. +class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNonNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT_NULL(term) aggregate. +class ICEBERG_EXPORT CountNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT(*) aggregate. +class ICEBERG_EXPORT CountStarAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountStarAggregate>> Make(); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + CountStarAggregate(); +}; + +/// \brief Bound MAX/MIN aggregate. +class ICEBERG_EXPORT ValueAggregate : public BoundAggregate { Review Comment: Why not adding subclasses for MIN and MAX for ValueAggregate and ValueAggregator? ########## src/iceberg/expression/expression_visitor.h: ########## @@ -77,6 +79,22 @@ class ICEBERG_EXPORT ExpressionVisitor { /// \brief Visit an unbound predicate. /// \param pred The unbound predicate to visit virtual Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) = 0; + + /// \brief Visit a bound aggregate. + /// \param aggregate The bound aggregate to visit. + virtual Result<R> Aggregate(const std::shared_ptr<BoundAggregate>& aggregate) { + ICEBERG_DCHECK(aggregate != nullptr, "Bound aggregate cannot be null"); + return NotSupported("Visitor {} does not support bound aggregate: {}", + typeid(*this).name(), aggregate->ToString()); Review Comment: ```suggestion return NotSupported("Visitor {} does not support bound aggregate", typeid(*this).name()); ``` ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); Review Comment: ```suggestion "Aggregate term cannot be null except for COUNT(*)"); ``` ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } Review Comment: Do we actually need to use `Result` to wrap the return type? ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); Review Comment: ```suggestion "Bound aggregate term should not be null except for COUNT(*)"); ``` ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( Review Comment: Same for other Make functions. ########## src/iceberg/expression/aggregate.cc: ########## @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/aggregate.h" + +#include <format> +#include <optional> +#include <vector> + +#include "iceberg/exception.h" +#include "iceberg/expression/binder.h" +#include "iceberg/row/struct_like.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Result<std::shared_ptr<PrimitiveType>> GetPrimitiveType(const BoundTerm& term) { + if (!term.type()->is_primitive()) { + return InvalidExpression("Aggregate requires primitive type, got {}", + term.type()->ToString()); + } + return internal::checked_pointer_cast<PrimitiveType>(term.type()); +} + +class CountNonNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNonNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (!literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountNullAggregator : public BoundAggregate::Aggregator { + public: + explicit CountNullAggregator(std::shared_ptr<BoundTerm> term) + : term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto literal, term_->Evaluate(row)); + if (literal.IsNull()) { + ++count_; + } + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + std::shared_ptr<BoundTerm> term_; + int64_t count_ = 0; +}; + +class CountStarAggregator : public BoundAggregate::Aggregator { + public: + Status Update(const StructLike& /*row*/) override { + ++count_; + return {}; + } + + Result<Literal> ResultLiteral() const override { return Literal::Long(count_); } + + private: + int64_t count_ = 0; +}; + +class ValueAggregatorImpl : public BoundAggregate::Aggregator { + public: + ValueAggregatorImpl(bool is_max, std::shared_ptr<BoundTerm> term) + : is_max_(is_max), term_(std::move(term)) {} + + Status Update(const StructLike& row) override { + ICEBERG_ASSIGN_OR_RAISE(auto val_literal, term_->Evaluate(row)); + if (val_literal.IsNull()) { + return {}; + } + if (!current_) { + current_ = std::move(val_literal); + return {}; + } + + auto ordering = val_literal <=> *current_; + if (ordering == std::partial_ordering::unordered) { + return InvalidExpression("Cannot compare literals of type {}", + val_literal.type()->ToString()); + } + + if (is_max_) { + if (ordering == std::partial_ordering::greater) { + current_ = std::move(val_literal); + } + } else { + if (ordering == std::partial_ordering::less) { + current_ = std::move(val_literal); + } + } + return {}; + } + + Result<Literal> ResultLiteral() const override { + if (current_) { + return *current_; + } + ICEBERG_ASSIGN_OR_RAISE(auto type, GetPrimitiveType(*term_)); + return Literal::Null(type); + } + + private: + bool is_max_; + std::shared_ptr<BoundTerm> term_; + std::optional<Literal> current_; +}; + +} // namespace + +// -------------------- Bound aggregates -------------------- + +CountNonNullAggregate::CountNonNullAggregate(std::shared_ptr<BoundTerm> term) + : CountAggregate(Expression::Operation::kCount, std::move(term)) {} + +Result<std::shared_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( Review Comment: ```suggestion Result<std::unique_ptr<CountNonNullAggregate>> CountNonNullAggregate::Make( ``` ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); + return term_ ? term_->reference() : nullptr; + } + + Result<Literal> Evaluate(const StructLike& data) const override = 0; + + bool is_bound_aggregate() const override { return true; } + + enum class Kind : int8_t { + // Count aggregates (COUNT, COUNT_STAR, COUNT_NULL) + kCount = 0, + // Value aggregates (MIN, MAX) + kValue, + }; + + virtual Kind kind() const = 0; + virtual Result<std::unique_ptr<Aggregator>> NewAggregator() const = 0; + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate<BoundTerm>(op, std::move(term)) {} +}; + +/// \brief Base class for COUNT aggregates. +class ICEBERG_EXPORT CountAggregate : public BoundAggregate { + public: + Kind kind() const override { return Kind::kCount; } + + protected: + CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} +}; + +/// \brief COUNT(term) aggregate. +class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNonNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT_NULL(term) aggregate. +class ICEBERG_EXPORT CountNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT(*) aggregate. +class ICEBERG_EXPORT CountStarAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountStarAggregate>> Make(); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + CountStarAggregate(); +}; + +/// \brief Bound MAX/MIN aggregate. +class ICEBERG_EXPORT ValueAggregate : public BoundAggregate { + public: + ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term); + + Kind kind() const override { return Kind::kValue; } + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; +}; + +/// \brief Evaluates bound aggregates over StructLike rows. +class ICEBERG_EXPORT AggregateEvaluator { + public: + virtual ~AggregateEvaluator() = default; + + /// \brief Create an evaluator for a single bound aggregate. + /// \param aggregate The bound aggregate to evaluate across rows. + static Result<std::unique_ptr<AggregateEvaluator>> Make( + std::shared_ptr<BoundAggregate> aggregate); + + /// \brief Create an evaluator for multiple bound aggregates. + /// \param aggregates Aggregates to evaluate in one pass; order is preserved in + /// Results(). + static Result<std::unique_ptr<AggregateEvaluator>> MakeList( Review Comment: ```suggestion static Result<std::unique_ptr<AggregateEvaluator>> Make( ``` ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); + return term_ ? term_->reference() : nullptr; + } + + Result<Literal> Evaluate(const StructLike& data) const override = 0; + + bool is_bound_aggregate() const override { return true; } + + enum class Kind : int8_t { + // Count aggregates (COUNT, COUNT_STAR, COUNT_NULL) + kCount = 0, + // Value aggregates (MIN, MAX) + kValue, + }; + + virtual Kind kind() const = 0; + virtual Result<std::unique_ptr<Aggregator>> NewAggregator() const = 0; Review Comment: If it will not return any error, we can use `std::unique_ptr<Aggregator>` as its return type. ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; + }; + + std::shared_ptr<BoundReference> reference() override { + ICEBERG_DCHECK(term_ != nullptr || op() == Expression::Operation::kCountStar, + "Bound aggregate term should not be null unless COUNT(*)"); + return term_ ? term_->reference() : nullptr; + } + + Result<Literal> Evaluate(const StructLike& data) const override = 0; + + bool is_bound_aggregate() const override { return true; } + + enum class Kind : int8_t { + // Count aggregates (COUNT, COUNT_STAR, COUNT_NULL) + kCount = 0, + // Value aggregates (MIN, MAX) + kValue, + }; + + virtual Kind kind() const = 0; + virtual Result<std::unique_ptr<Aggregator>> NewAggregator() const = 0; + + protected: + BoundAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : Aggregate<BoundTerm>(op, std::move(term)) {} +}; + +/// \brief Base class for COUNT aggregates. +class ICEBERG_EXPORT CountAggregate : public BoundAggregate { + public: + Kind kind() const override { return Kind::kCount; } + + protected: + CountAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term) + : BoundAggregate(op, std::move(term)) {} +}; + +/// \brief COUNT(term) aggregate. +class ICEBERG_EXPORT CountNonNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNonNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNonNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT_NULL(term) aggregate. +class ICEBERG_EXPORT CountNullAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountNullAggregate>> Make( + std::shared_ptr<BoundTerm> term); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + explicit CountNullAggregate(std::shared_ptr<BoundTerm> term); +}; + +/// \brief COUNT(*) aggregate. +class ICEBERG_EXPORT CountStarAggregate : public CountAggregate { + public: + static Result<std::shared_ptr<CountStarAggregate>> Make(); + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; + + private: + CountStarAggregate(); +}; + +/// \brief Bound MAX/MIN aggregate. +class ICEBERG_EXPORT ValueAggregate : public BoundAggregate { + public: + ValueAggregate(Expression::Operation op, std::shared_ptr<BoundTerm> term); + + Kind kind() const override { return Kind::kValue; } + + std::string ToString() const override; + Result<Literal> Evaluate(const StructLike& data) const override; + Result<std::unique_ptr<Aggregator>> NewAggregator() const override; +}; + +/// \brief Evaluates bound aggregates over StructLike rows. +class ICEBERG_EXPORT AggregateEvaluator { + public: + virtual ~AggregateEvaluator() = default; + + /// \brief Create an evaluator for a single bound aggregate. + /// \param aggregate The bound aggregate to evaluate across rows. + static Result<std::unique_ptr<AggregateEvaluator>> Make( + std::shared_ptr<BoundAggregate> aggregate); + + /// \brief Create an evaluator for multiple bound aggregates. + /// \param aggregates Aggregates to evaluate in one pass; order is preserved in + /// Results(). + static Result<std::unique_ptr<AggregateEvaluator>> MakeList( + std::vector<std::shared_ptr<BoundAggregate>> aggregates); + + /// \brief Update aggregates with a row. + virtual Status Update(const StructLike& row) = 0; + + /// \brief Final aggregated value. + virtual Result<std::vector<Literal>> Results() const = 0; + + /// \brief Convenience accessor when only one aggregate is evaluated. + virtual Result<Literal> ResultLiteral() const = 0; Review Comment: ```suggestion virtual Result<std::vector<Literal>> GetResults() const = 0; /// \brief Convenience accessor when only one aggregate is evaluated. virtual Result<Literal> GetResult() const = 0; ``` Perhaps this looks more consistent. ########## src/iceberg/expression/expression_visitor.h: ########## @@ -77,6 +79,22 @@ class ICEBERG_EXPORT ExpressionVisitor { /// \brief Visit an unbound predicate. /// \param pred The unbound predicate to visit virtual Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) = 0; + + /// \brief Visit a bound aggregate. + /// \param aggregate The bound aggregate to visit. + virtual Result<R> Aggregate(const std::shared_ptr<BoundAggregate>& aggregate) { + ICEBERG_DCHECK(aggregate != nullptr, "Bound aggregate cannot be null"); + return NotSupported("Visitor {} does not support bound aggregate: {}", + typeid(*this).name(), aggregate->ToString()); + } + + /// \brief Visit an unbound aggregate. + /// \param aggregate The unbound aggregate to visit. + virtual Result<R> Aggregate(const std::shared_ptr<UnboundAggregate>& aggregate) { + ICEBERG_DCHECK(aggregate != nullptr, "Unbound aggregate cannot be null"); + return NotSupported("Visitor {} does not support unbound aggregate: {}", + typeid(*this).name(), aggregate->ToString()); Review Comment: ```suggestion ICEBERG_DCHECK(aggregate != nullptr, "Unbound aggregate cannot be null"); return NotSupported("Visitor {} does not support unbound aggregate", typeid(*this).name()); ``` ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { Review Comment: ```suggestion class Aggregator { ``` I think we don't need to add export for inner class. ########## src/iceberg/expression/aggregate.h: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/aggregate.h +/// Aggregate expression definitions. + +#include <memory> +#include <optional> +#include <string> +#include <vector> + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/term.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Base aggregate holding an operation and a term. +template <TermType T> +class ICEBERG_EXPORT Aggregate : public virtual Expression { + public: + ~Aggregate() override = default; + + Expression::Operation op() const override { return operation_; } + + const std::shared_ptr<T>& term() const { return term_; } + + protected: + Aggregate(Expression::Operation op, std::shared_ptr<T> term) + : operation_(op), term_(std::move(term)) {} + + Expression::Operation operation_; + std::shared_ptr<T> term_; +}; + +/// \brief Base class for unbound aggregates. +class ICEBERG_EXPORT UnboundAggregate : public virtual Expression, + public Unbound<Expression> { + public: + ~UnboundAggregate() override = default; + + bool is_unbound_aggregate() const override { return true; } +}; + +/// \brief Template for unbound aggregates that carry a term and operation. +template <typename B> +class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, + public Aggregate<UnboundTerm<B>> { + using BASE = Aggregate<UnboundTerm<B>>; + + public: + static Result<std::shared_ptr<UnboundAggregateImpl<B>>> Make( + Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term); + + std::shared_ptr<NamedReference> reference() override { + return BASE::term() ? BASE::term()->reference() : nullptr; + } + + Result<std::shared_ptr<Expression>> Bind(const Schema& schema, + bool case_sensitive) const override; + + std::string ToString() const override; + + private: + static constexpr bool IsSupportedOp(Expression::Operation op) { + return op == Expression::Operation::kCount || + op == Expression::Operation::kCountNull || + op == Expression::Operation::kCountStar || op == Expression::Operation::kMax || + op == Expression::Operation::kMin; + } + + UnboundAggregateImpl(Expression::Operation op, std::shared_ptr<UnboundTerm<B>> term) + : BASE(op, std::move(term)) { + ICEBERG_DCHECK(IsSupportedOp(op), "Unexpected aggregate operation"); + ICEBERG_DCHECK(op == Expression::Operation::kCountStar || BASE::term() != nullptr, + "Aggregate term cannot be null unless COUNT(*)"); + } +}; + +/// \brief Base class for bound aggregates. +class ICEBERG_EXPORT BoundAggregate : public Aggregate<BoundTerm>, public Bound { + public: + using Aggregate<BoundTerm>::op; + using Aggregate<BoundTerm>::term; + + class ICEBERG_EXPORT Aggregator { + public: + virtual ~Aggregator() = default; + + virtual Status Update(const StructLike& row) = 0; + virtual Status Update(const DataFile& file) { + return NotSupported("Aggregating DataFile not supported"); + } + virtual Result<Literal> ResultLiteral() const = 0; Review Comment: ```suggestion virtual Result<Literal> GetResult() const = 0; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
