kou commented on code in PR #34817: URL: https://github.com/apache/arrow/pull/34817#discussion_r1396667224
########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, Review Comment: ```suggestion kInvalidName, ``` ########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, + /// \brief The session option value is invalid. + kInvalidValue, + /// \brief The session option cannot be set. + kError +}; +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r); + +/// \brief The result of closing a session. +enum class CloseSessionStatus : int8_t { kUnspecified, kClosed, kClosing, kNotClosable }; Review Comment: Can we add document to each enum values? ########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, + /// \brief The session option value is invalid. + kInvalidValue, + /// \brief The session option cannot be set. + kError +}; +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r); Review Comment: ```suggestion std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& status); ``` ########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, + /// \brief The session option value is invalid. + kInvalidValue, + /// \brief The session option cannot be set. + kError +}; +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r); + +/// \brief The result of closing a session. +enum class CloseSessionStatus : int8_t { kUnspecified, kClosed, kClosing, kNotClosable }; +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r); + +static const char* const SetSessionOptionStatusNames[] = { + "Unspecified", "Ok", "OkMapped", "InvalidKey", "InvalidValue", "Error"}; +static const char* const CloseSessionStatusNames[] = {"Unspecified", "Closed", "Closing", + "NotClosable"}; + +/// \brief A request to set a set of session options by key/value. Review Comment: ```suggestion /// \brief A request to set a set of session options by name/value. ``` ########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, + /// \brief The session option value is invalid. + kInvalidValue, + /// \brief The session option cannot be set. + kError +}; +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r); + +/// \brief The result of closing a session. +enum class CloseSessionStatus : int8_t { kUnspecified, kClosed, kClosing, kNotClosable }; +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r); + +static const char* const SetSessionOptionStatusNames[] = { + "Unspecified", "Ok", "OkMapped", "InvalidKey", "InvalidValue", "Error"}; +static const char* const CloseSessionStatusNames[] = {"Unspecified", "Closed", "Closing", + "NotClosable"}; + +/// \brief A request to set a set of session options by key/value. +struct ARROW_FLIGHT_EXPORT SetSessionOptionsRequest { + std::map<std::string, SessionOptionValue> session_options; + + std::string ToString() const; + bool Equals(const SetSessionOptionsRequest& other) const; + + friend bool operator==(const SetSessionOptionsRequest& left, + const SetSessionOptionsRequest& right) { + return left.Equals(right); + } + friend bool operator!=(const SetSessionOptionsRequest& left, + const SetSessionOptionsRequest& right) { + return !(left == right); + } + + /// \brief Serialize this message to its wire-format representation. + arrow::Result<std::string> SerializeToString() const; + + /// \brief Deserialize this message from its wire-format representation. + static arrow::Result<SetSessionOptionsRequest> Deserialize(std::string_view serialized); +}; + +/// \brief The result(s) of setting session option(s). +struct ARROW_FLIGHT_EXPORT SetSessionOptionsResult { + struct Result { + SetSessionOptionStatus status; + + bool Equals(const Result& other) const { + if (status != other.status) { + return false; + } + return true; Review Comment: ```suggestion return status == other.status; ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { Review Comment: ```suggestion std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& status) { ``` ########## cpp/src/arrow/flight/types.h: ########## @@ -750,6 +754,190 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest { static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized); }; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +using SessionOptionValue = std::variant<std::string, bool, int32_t, int64_t, float, + double, std::vector<std::string>>; + +/// \brief The result of setting a session option. +enum class SetSessionOptionStatus : int8_t { + /// \brief The status of setting the option is unknown. + /// + /// Servers should avoid using this value (send a NOT_FOUND error if the requested + /// query is not known). Clients can retry the request. + kUnspecified, + // The session option setting completed successfully. + kOk, + /// \brief The given session option name was an alias for another option name. + kOkMapped, + /// \brief The given session option name is invalid. + kInvalidKey, + /// \brief The session option value is invalid. + kInvalidValue, + /// \brief The session option cannot be set. + kError +}; +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r); + +/// \brief The result of closing a session. +enum class CloseSessionStatus : int8_t { kUnspecified, kClosed, kClosing, kNotClosable }; +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r); Review Comment: ```suggestion std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status); ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { + os << CloseSessionStatusNames[static_cast<int>(r)]; + return os; +} + +// Helpers for stringifying maps containing various types +std::ostream& operator<<(std::ostream& os, std::vector<std::string> v) { + os << '['; + std::string sep = ""; + for (const auto& x : v) { + os << sep << '"' << x << '"'; Review Comment: If we want to quote a string value in `std::vector<std::string>` with `"`, we also want to quote `std::string` case too. If we don't quote, we may be able to use `JoinStrings()` in `cpp/src/arrow/util/string.h`. ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { Review Comment: ```suggestion std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status) { ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -643,6 +957,21 @@ const ActionType ActionType::kRenewFlightEndpoint = "Extend expiration time of the given FlightEndpoint.\n" "Request Message: RenewFlightEndpointRequest\n" "Response Message: Renewed FlightEndpoint"}; +const ActionType ActionType::kSetSessionOptions = + ActionType{"SetSessionOptions", + "Set client session options by key/value pairs.\n" Review Comment: ```suggestion "Set client session options by name/value pairs.\n" ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { + os << CloseSessionStatusNames[static_cast<int>(r)]; + return os; +} + +// Helpers for stringifying maps containing various types +std::ostream& operator<<(std::ostream& os, std::vector<std::string> v) { + os << '['; + std::string sep = ""; + for (const auto& x : v) { + os << sep << '"' << x << '"'; + sep = ", "; + } + os << ']'; + + return os; +} + +std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v) { + std::visit([&](const auto& x) { os << x; }, v); + return os; +} + +std::ostream& operator<<(std::ostream& os, const SetSessionOptionsResult::Result& r) { + os << '{' << r.status << '}'; + return os; +} + +template <typename T> +std::ostream& operator<<(std::ostream& os, std::map<std::string, T> m) { + os << '{'; + std::string sep = ""; + for (const auto& [k, v] : m) { + os << sep << '[' << k << "]: '" << v; + sep = ", "; + } + os << '}'; + + return os; +} + +namespace { +static bool CompareSessionOptionMaps(const std::map<std::string, SessionOptionValue>& a, + const std::map<std::string, SessionOptionValue>& b) { + if (a.size() != b.size()) { + return false; + } + for (const auto& [k, v] : a) { + if (const auto it = b.find(k); it == b.end()) { + return false; + } else { + const auto& b_v = it->second; + if (v.index() != b_v.index()) { + return false; + } + if (v != b_v) { + return false; + } + } + } + return true; +} +} // namespace + +// SetSessionOptionsRequest + +std::string SetSessionOptionsRequest::ToString() const { + std::stringstream ss; + + ss << "<SetSessionOptionsRequest session_options=" << session_options << '>'; + + return ss.str(); +} + +bool SetSessionOptionsRequest::Equals(const SetSessionOptionsRequest& other) const { + return CompareSessionOptionMaps(session_options, other.session_options); +} + +arrow::Result<std::string> SetSessionOptionsRequest::SerializeToString() const { + pb::SetSessionOptionsRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized SetSessionOptionsRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<SetSessionOptionsRequest> SetSessionOptionsRequest::Deserialize( + std::string_view serialized) { + // TODO these & SerializeToString should all be factored out to a superclass + pb::SetSessionOptionsRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized SetSessionOptionsRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid SetSessionOptionsRequest"); + } + SetSessionOptionsRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// SetSessionOptionsResult + +std::string SetSessionOptionsResult::ToString() const { + std::stringstream ss; + + ss << "<SetSessionOptionsResult results=" << results << '>'; + + return ss.str(); +} + +bool SetSessionOptionsResult::Equals(const SetSessionOptionsResult& other) const { + if (results != other.results) { + return false; + } + return true; +} + +arrow::Result<std::string> SetSessionOptionsResult::SerializeToString() const { + pb::SetSessionOptionsResult pb_result; + RETURN_NOT_OK(internal::ToProto(*this, &pb_result)); + + std::string out; + if (!pb_result.SerializeToString(&out)) { + return Status::IOError("Serialized SetSessionOptionsResult exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<SetSessionOptionsResult> SetSessionOptionsResult::Deserialize( + std::string_view serialized) { + pb::SetSessionOptionsResult pb_result; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized SetSessionOptionsResult size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_result.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid SetSessionOptionsResult"); + } + SetSessionOptionsResult out; + RETURN_NOT_OK(internal::FromProto(pb_result, &out)); + return out; +} + +// GetSessionOptionsRequest + +std::string GetSessionOptionsRequest::ToString() const { + return "<GetSessionOptionsRequest>"; +} + +bool GetSessionOptionsRequest::Equals(const GetSessionOptionsRequest& other) const { + return true; +} + +arrow::Result<std::string> GetSessionOptionsRequest::SerializeToString() const { + pb::GetSessionOptionsRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized GetSessionOptionsRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<GetSessionOptionsRequest> GetSessionOptionsRequest::Deserialize( + std::string_view serialized) { + pb::GetSessionOptionsRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized GetSessionOptionsRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid GetSessionOptionsRequest"); + } + GetSessionOptionsRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// GetSessionOptionsResult + +std::string GetSessionOptionsResult::ToString() const { + std::stringstream ss; + + ss << "<GetSessionOptionsResult session_options=" << session_options << '>'; + + return ss.str(); +} + +bool GetSessionOptionsResult::Equals(const GetSessionOptionsResult& other) const { + return CompareSessionOptionMaps(session_options, other.session_options); +} + +arrow::Result<std::string> GetSessionOptionsResult::SerializeToString() const { + pb::GetSessionOptionsResult pb_result; + RETURN_NOT_OK(internal::ToProto(*this, &pb_result)); + + std::string out; + if (!pb_result.SerializeToString(&out)) { + return Status::IOError("Serialized GetSessionOptionsResult exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<GetSessionOptionsResult> GetSessionOptionsResult::Deserialize( + std::string_view serialized) { + pb::GetSessionOptionsResult pb_result; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized GetSessionOptionsResult size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_result.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid GetSessionOptionsResult"); + } + GetSessionOptionsResult out; + RETURN_NOT_OK(internal::FromProto(pb_result, &out)); + return out; +} + +// CloseSessionRequest + +std::string CloseSessionRequest::ToString() const { return "<CloseSessionRequest>"; } + +bool CloseSessionRequest::Equals(const CloseSessionRequest& other) const { return true; } + +arrow::Result<std::string> CloseSessionRequest::SerializeToString() const { + pb::CloseSessionRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized CloseSessionRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<CloseSessionRequest> CloseSessionRequest::Deserialize( + std::string_view serialized) { + pb::CloseSessionRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid("Serialized CloseSessionRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid CloseSessionRequest"); + } + CloseSessionRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// CloseSessionResult + +std::string CloseSessionResult::ToString() const { + std::stringstream ss; + + ss << "<CloseSessionResult result=" << status << '>'; Review Comment: ```suggestion ss << "<CloseSessionResult status=" << status << '>'; ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); + /// \brief Set session option by key to given value + void SetSessionOption(const std::string& key, const SessionOptionValue v); + /// \brief Idempotently remove key from this call's Session, if Session & key exist + void EraseSessionOption(const std::string& key); Review Comment: ```suggestion /// \brief Idempotently remove name from this call's session, if session & name exist void EraseSessionOption(const std::string& name); ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); Review Comment: ```suggestion /// \brief Get session option by name std::optional<SessionOptionValue> GetSessionOption(const std::string& name); ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { + os << CloseSessionStatusNames[static_cast<int>(r)]; + return os; +} + +// Helpers for stringifying maps containing various types +std::ostream& operator<<(std::ostream& os, std::vector<std::string> v) { + os << '['; + std::string sep = ""; + for (const auto& x : v) { + os << sep << '"' << x << '"'; + sep = ", "; + } + os << ']'; + + return os; +} + +std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v) { + std::visit([&](const auto& x) { os << x; }, v); + return os; +} + +std::ostream& operator<<(std::ostream& os, const SetSessionOptionsResult::Result& r) { + os << '{' << r.status << '}'; + return os; +} + +template <typename T> +std::ostream& operator<<(std::ostream& os, std::map<std::string, T> m) { + os << '{'; + std::string sep = ""; + for (const auto& [k, v] : m) { + os << sep << '[' << k << "]: '" << v; + sep = ", "; + } + os << '}'; + + return os; +} + +namespace { +static bool CompareSessionOptionMaps(const std::map<std::string, SessionOptionValue>& a, + const std::map<std::string, SessionOptionValue>& b) { + if (a.size() != b.size()) { + return false; + } + for (const auto& [k, v] : a) { + if (const auto it = b.find(k); it == b.end()) { + return false; + } else { + const auto& b_v = it->second; + if (v.index() != b_v.index()) { + return false; + } + if (v != b_v) { + return false; + } + } + } + return true; +} +} // namespace + +// SetSessionOptionsRequest + +std::string SetSessionOptionsRequest::ToString() const { + std::stringstream ss; + + ss << "<SetSessionOptionsRequest session_options=" << session_options << '>'; + + return ss.str(); +} + +bool SetSessionOptionsRequest::Equals(const SetSessionOptionsRequest& other) const { + return CompareSessionOptionMaps(session_options, other.session_options); +} + +arrow::Result<std::string> SetSessionOptionsRequest::SerializeToString() const { + pb::SetSessionOptionsRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized SetSessionOptionsRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<SetSessionOptionsRequest> SetSessionOptionsRequest::Deserialize( + std::string_view serialized) { + // TODO these & SerializeToString should all be factored out to a superclass + pb::SetSessionOptionsRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized SetSessionOptionsRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid SetSessionOptionsRequest"); + } + SetSessionOptionsRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// SetSessionOptionsResult + +std::string SetSessionOptionsResult::ToString() const { + std::stringstream ss; + + ss << "<SetSessionOptionsResult results=" << results << '>'; + + return ss.str(); +} + +bool SetSessionOptionsResult::Equals(const SetSessionOptionsResult& other) const { + if (results != other.results) { + return false; + } + return true; +} + +arrow::Result<std::string> SetSessionOptionsResult::SerializeToString() const { + pb::SetSessionOptionsResult pb_result; + RETURN_NOT_OK(internal::ToProto(*this, &pb_result)); + + std::string out; + if (!pb_result.SerializeToString(&out)) { + return Status::IOError("Serialized SetSessionOptionsResult exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<SetSessionOptionsResult> SetSessionOptionsResult::Deserialize( + std::string_view serialized) { + pb::SetSessionOptionsResult pb_result; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized SetSessionOptionsResult size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_result.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid SetSessionOptionsResult"); + } + SetSessionOptionsResult out; + RETURN_NOT_OK(internal::FromProto(pb_result, &out)); + return out; +} + +// GetSessionOptionsRequest + +std::string GetSessionOptionsRequest::ToString() const { + return "<GetSessionOptionsRequest>"; +} + +bool GetSessionOptionsRequest::Equals(const GetSessionOptionsRequest& other) const { + return true; +} + +arrow::Result<std::string> GetSessionOptionsRequest::SerializeToString() const { + pb::GetSessionOptionsRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized GetSessionOptionsRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<GetSessionOptionsRequest> GetSessionOptionsRequest::Deserialize( + std::string_view serialized) { + pb::GetSessionOptionsRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized GetSessionOptionsRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid GetSessionOptionsRequest"); + } + GetSessionOptionsRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// GetSessionOptionsResult + +std::string GetSessionOptionsResult::ToString() const { + std::stringstream ss; + + ss << "<GetSessionOptionsResult session_options=" << session_options << '>'; + + return ss.str(); +} + +bool GetSessionOptionsResult::Equals(const GetSessionOptionsResult& other) const { + return CompareSessionOptionMaps(session_options, other.session_options); +} + +arrow::Result<std::string> GetSessionOptionsResult::SerializeToString() const { + pb::GetSessionOptionsResult pb_result; + RETURN_NOT_OK(internal::ToProto(*this, &pb_result)); + + std::string out; + if (!pb_result.SerializeToString(&out)) { + return Status::IOError("Serialized GetSessionOptionsResult exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<GetSessionOptionsResult> GetSessionOptionsResult::Deserialize( + std::string_view serialized) { + pb::GetSessionOptionsResult pb_result; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid( + "Serialized GetSessionOptionsResult size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_result.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid GetSessionOptionsResult"); + } + GetSessionOptionsResult out; + RETURN_NOT_OK(internal::FromProto(pb_result, &out)); + return out; +} + +// CloseSessionRequest + +std::string CloseSessionRequest::ToString() const { return "<CloseSessionRequest>"; } + +bool CloseSessionRequest::Equals(const CloseSessionRequest& other) const { return true; } + +arrow::Result<std::string> CloseSessionRequest::SerializeToString() const { + pb::CloseSessionRequest pb_request; + RETURN_NOT_OK(internal::ToProto(*this, &pb_request)); + + std::string out; + if (!pb_request.SerializeToString(&out)) { + return Status::IOError("Serialized CloseSessionRequest exceeded 2GiB limit"); + } + return out; +} + +arrow::Result<CloseSessionRequest> CloseSessionRequest::Deserialize( + std::string_view serialized) { + pb::CloseSessionRequest pb_request; + if (serialized.size() > static_cast<size_t>(std::numeric_limits<int>::max())) { + return Status::Invalid("Serialized CloseSessionRequest size should not exceed 2 GiB"); + } + google::protobuf::io::ArrayInputStream input(serialized.data(), + static_cast<int>(serialized.size())); + if (!pb_request.ParseFromZeroCopyStream(&input)) { + return Status::Invalid("Not a valid CloseSessionRequest"); + } + CloseSessionRequest out; + RETURN_NOT_OK(internal::FromProto(pb_request, &out)); + return out; +} + +// CloseSessionResult + +std::string CloseSessionResult::ToString() const { + std::stringstream ss; + + ss << "<CloseSessionResult result=" << status << '>'; + + return ss.str(); +} + +bool CloseSessionResult::Equals(const CloseSessionResult& other) const { + if (status != other.status) { + return false; + } + return true; Review Comment: ```suggestion return status == other.status; ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); Review Comment: How about `Create` than `Get` because `Create` is more descriptive? ```suggestion std::pair<std::string, std::shared_ptr<FlightSqlSession>> CreateNewSession(); ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); + /// \brief Set session option by key to given value + void SetSessionOption(const std::string& key, const SessionOptionValue v); Review Comment: ```suggestion /// \brief Set session option by name to given value void SetSessionOption(const std::string& name, const SessionOptionValue value); ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; Review Comment: I think that `mutex_` is better here because this is not `std::shared_lock`: ```suggestion std::shared_mutex mutex_; ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); + /// \brief Set session option by key to given value + void SetSessionOption(const std::string& key, const SessionOptionValue v); + /// \brief Idempotently remove key from this call's Session, if Session & key exist + void EraseSessionOption(const std::string& key); +}; + +/// \brief A middleware to handle Session option persistence and related *Cookie headers. Review Comment: ```suggestion /// \brief A middleware to handle session option persistence and related cookie headers. ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); + /// \brief Set session option by key to given value + void SetSessionOption(const std::string& key, const SessionOptionValue v); + /// \brief Idempotently remove key from this call's Session, if Session & key exist + void EraseSessionOption(const std::string& key); +}; + +/// \brief A middleware to handle Session option persistence and related *Cookie headers. +class ARROW_FLIGHT_SQL_EXPORT ServerSessionMiddleware : public ServerMiddleware { + public: + static constexpr char const kMiddlewareName[] = + "arrow::flight::sql::ServerSessionMiddleware"; + + std::string name() const override { return kMiddlewareName; } + + /// \brief Is there an existing session (either existing or new) + virtual bool HasSession() const = 0; + /// \brief Get existing or new call-associated session + virtual std::shared_ptr<FlightSqlSession> GetSession() = 0; + /// \brief Get request headers, in lieu of a provided or created session. + virtual const CallHeaders& GetCallHeaders() const = 0; +}; + +/// \brief Returns a ServerMiddlewareFactory that handles Session option storage. +/// \param[in] id_gen A generator function for unique session id strings. Review Comment: ```suggestion /// \param[in] id_gen A generator function for unique session ID strings. ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; + + public: + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers) + : factory_(factory), headers_(headers), existing_session(false) {} + + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers, + std::shared_ptr<FlightSqlSession> session, + std::string session_id) + : factory_(factory), + headers_(headers), + session_(std::move(session)), + session_id_(std::move(session_id)), + existing_session(true) {} + + void SendingHeaders(AddCallHeaders* add_call_headers) override { + if (!existing_session && session_) { + add_call_headers->AddHeader( + "set-cookie", static_cast<std::string>(kSessionCookieName) + "=" + session_id_); + } + } + + void CallCompleted(const Status&) override {} + + bool HasSession() const override { return static_cast<bool>(session_); } + + std::shared_ptr<FlightSqlSession> GetSession() override { + const std::shared_lock<std::shared_mutex> l(lock_); Review Comment: Is `shared_lock` safe here? It seems that we need to use `lock_guard` instead here because `session_` may be updated. ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; + + public: + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers) + : factory_(factory), headers_(headers), existing_session(false) {} + + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers, + std::shared_ptr<FlightSqlSession> session, + std::string session_id) + : factory_(factory), + headers_(headers), + session_(std::move(session)), + session_id_(std::move(session_id)), + existing_session(true) {} + + void SendingHeaders(AddCallHeaders* add_call_headers) override { + if (!existing_session && session_) { + add_call_headers->AddHeader( + "set-cookie", static_cast<std::string>(kSessionCookieName) + "=" + session_id_); + } + } + + void CallCompleted(const Status&) override {} + + bool HasSession() const override { return static_cast<bool>(session_); } + + std::shared_ptr<FlightSqlSession> GetSession() override { + const std::shared_lock<std::shared_mutex> l(lock_); + if (!session_) { + auto [id, s] = factory_->GetNewSession(); + session_ = std::move(s); + session_id_ = std::move(id); + } + return session_; + } + + const CallHeaders& GetCallHeaders() const override { return headers_; } +}; + +std::vector<std::pair<std::string, std::string>> +ServerSessionMiddlewareFactory::ParseCookieString(const std::string_view& s) { + const std::string list_sep = "; "; + const std::string pair_sep = "="; + const size_t pair_sep_len = pair_sep.length(); Review Comment: ```suggestion const auto pair_sep_len = pair_sep.length(); ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; + + public: + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers) + : factory_(factory), headers_(headers), existing_session(false) {} + + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers, + std::shared_ptr<FlightSqlSession> session, + std::string session_id) + : factory_(factory), + headers_(headers), + session_(std::move(session)), + session_id_(std::move(session_id)), + existing_session(true) {} + + void SendingHeaders(AddCallHeaders* add_call_headers) override { + if (!existing_session && session_) { + add_call_headers->AddHeader( + "set-cookie", static_cast<std::string>(kSessionCookieName) + "=" + session_id_); + } + } + + void CallCompleted(const Status&) override {} + + bool HasSession() const override { return static_cast<bool>(session_); } + + std::shared_ptr<FlightSqlSession> GetSession() override { + const std::shared_lock<std::shared_mutex> l(lock_); + if (!session_) { + auto [id, s] = factory_->GetNewSession(); + session_ = std::move(s); + session_id_ = std::move(id); + } + return session_; + } + + const CallHeaders& GetCallHeaders() const override { return headers_; } +}; + +std::vector<std::pair<std::string, std::string>> +ServerSessionMiddlewareFactory::ParseCookieString(const std::string_view& s) { + const std::string list_sep = "; "; + const std::string pair_sep = "="; + const size_t pair_sep_len = pair_sep.length(); + + std::vector<std::pair<std::string, std::string>> result; + + size_t cur = 0; + while (cur < s.length()) { + const size_t end = s.find(list_sep, cur); + size_t len; + if (end == std::string::npos) { + // No (further) list delimiters + len = std::string::npos; + cur = s.length(); + } else { + len = end - cur; + cur = end; + } + const std::string_view tok = s.substr(cur, len); + + const size_t val_pos = tok.find(pair_sep); + if (val_pos == std::string::npos) { + // The cookie header is somewhat malformed; ignore the key and continue parsing + continue; + } + result.emplace_back(tok.substr(0, val_pos), + tok.substr(val_pos + pair_sep_len, std::string::npos)); + } Review Comment: Can we use `arrow::flight::internal::Cookie` instead of implementing this? ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; + + public: + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers) + : factory_(factory), headers_(headers), existing_session(false) {} + + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers, + std::shared_ptr<FlightSqlSession> session, + std::string session_id) + : factory_(factory), + headers_(headers), + session_(std::move(session)), + session_id_(std::move(session_id)), + existing_session(true) {} + + void SendingHeaders(AddCallHeaders* add_call_headers) override { + if (!existing_session && session_) { + add_call_headers->AddHeader( + "set-cookie", static_cast<std::string>(kSessionCookieName) + "=" + session_id_); + } + } + + void CallCompleted(const Status&) override {} + + bool HasSession() const override { return static_cast<bool>(session_); } + + std::shared_ptr<FlightSqlSession> GetSession() override { + const std::shared_lock<std::shared_mutex> l(lock_); + if (!session_) { + auto [id, s] = factory_->GetNewSession(); + session_ = std::move(s); + session_id_ = std::move(id); + } + return session_; + } + + const CallHeaders& GetCallHeaders() const override { return headers_; } +}; + +std::vector<std::pair<std::string, std::string>> +ServerSessionMiddlewareFactory::ParseCookieString(const std::string_view& s) { + const std::string list_sep = "; "; + const std::string pair_sep = "="; + const size_t pair_sep_len = pair_sep.length(); + + std::vector<std::pair<std::string, std::string>> result; + + size_t cur = 0; + while (cur < s.length()) { + const size_t end = s.find(list_sep, cur); + size_t len; + if (end == std::string::npos) { + // No (further) list delimiters + len = std::string::npos; + cur = s.length(); + } else { + len = end - cur; + cur = end; + } + const std::string_view tok = s.substr(cur, len); + + const size_t val_pos = tok.find(pair_sep); + if (val_pos == std::string::npos) { + // The cookie header is somewhat malformed; ignore the key and continue parsing + continue; + } + result.emplace_back(tok.substr(0, val_pos), + tok.substr(val_pos + pair_sep_len, std::string::npos)); + } + + return result; +} + +Status ServerSessionMiddlewareFactory::StartCall( + const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware) { + std::string session_id; + + const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& + headers_it_pr = incoming_headers.equal_range("cookie"); + for (auto itr = headers_it_pr.first; itr != headers_it_pr.second; ++itr) { + const std::string_view& cookie_header = itr->second; + const std::vector<std::pair<std::string, std::string>> cookies = + ParseCookieString(cookie_header); + for (const std::pair<std::string, std::string>& cookie : cookies) { + if (cookie.first == kSessionCookieName) { + if (cookie.second.empty()) + return Status::Invalid("Empty ", kSessionCookieName, " cookie value."); + session_id = std::move(cookie.second); + } + } + if (!session_id.empty()) break; + } + + if (session_id.empty()) { + // No cookie was found + *middleware = std::make_shared<ServerSessionMiddlewareImpl>(this, incoming_headers); + } else { + const std::shared_lock<std::shared_mutex> l(session_store_lock_); + if (auto it = session_store_.find(session_id); it == session_store_.end()) { + return Status::Invalid("Invalid or expired ", kSessionCookieName, " cookie."); + } else { + auto session = it->second; + *middleware = std::make_shared<ServerSessionMiddlewareImpl>( + this, incoming_headers, std::move(session), session_id); + } + } + + return Status::OK(); +} + +/// \brief Get a new, empty session option map and its id key. +std::pair<std::string, std::shared_ptr<FlightSqlSession>> +ServerSessionMiddlewareFactory::GetNewSession() { + std::string new_id = id_generator_(); + auto session = std::make_shared<FlightSqlSession>(); + + const std::unique_lock<std::shared_mutex> l(session_store_lock_); + session_store_[new_id] = session; Review Comment: Should we re-generate a new ID when the generated ID is already used? ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; + + public: + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers) + : factory_(factory), headers_(headers), existing_session(false) {} + + ServerSessionMiddlewareImpl(ServerSessionMiddlewareFactory* factory, + const CallHeaders& headers, + std::shared_ptr<FlightSqlSession> session, + std::string session_id) + : factory_(factory), + headers_(headers), + session_(std::move(session)), + session_id_(std::move(session_id)), + existing_session(true) {} + + void SendingHeaders(AddCallHeaders* add_call_headers) override { + if (!existing_session && session_) { + add_call_headers->AddHeader( + "set-cookie", static_cast<std::string>(kSessionCookieName) + "=" + session_id_); + } + } + + void CallCompleted(const Status&) override {} + + bool HasSession() const override { return static_cast<bool>(session_); } + + std::shared_ptr<FlightSqlSession> GetSession() override { + const std::shared_lock<std::shared_mutex> l(lock_); + if (!session_) { + auto [id, s] = factory_->GetNewSession(); + session_ = std::move(s); + session_id_ = std::move(id); + } + return session_; + } + + const CallHeaders& GetCallHeaders() const override { return headers_; } +}; + +std::vector<std::pair<std::string, std::string>> +ServerSessionMiddlewareFactory::ParseCookieString(const std::string_view& s) { + const std::string list_sep = "; "; + const std::string pair_sep = "="; + const size_t pair_sep_len = pair_sep.length(); + + std::vector<std::pair<std::string, std::string>> result; + + size_t cur = 0; + while (cur < s.length()) { + const size_t end = s.find(list_sep, cur); + size_t len; + if (end == std::string::npos) { + // No (further) list delimiters + len = std::string::npos; + cur = s.length(); + } else { + len = end - cur; + cur = end; + } + const std::string_view tok = s.substr(cur, len); + + const size_t val_pos = tok.find(pair_sep); + if (val_pos == std::string::npos) { + // The cookie header is somewhat malformed; ignore the key and continue parsing + continue; + } + result.emplace_back(tok.substr(0, val_pos), + tok.substr(val_pos + pair_sep_len, std::string::npos)); + } + + return result; +} + +Status ServerSessionMiddlewareFactory::StartCall( + const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware) { + std::string session_id; + + const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& + headers_it_pr = incoming_headers.equal_range("cookie"); + for (auto itr = headers_it_pr.first; itr != headers_it_pr.second; ++itr) { + const std::string_view& cookie_header = itr->second; + const std::vector<std::pair<std::string, std::string>> cookies = + ParseCookieString(cookie_header); + for (const std::pair<std::string, std::string>& cookie : cookies) { + if (cookie.first == kSessionCookieName) { + if (cookie.second.empty()) + return Status::Invalid("Empty ", kSessionCookieName, " cookie value."); + session_id = std::move(cookie.second); + } + } + if (!session_id.empty()) break; + } + + if (session_id.empty()) { + // No cookie was found + *middleware = std::make_shared<ServerSessionMiddlewareImpl>(this, incoming_headers); + } else { + const std::shared_lock<std::shared_mutex> l(session_store_lock_); + if (auto it = session_store_.find(session_id); it == session_store_.end()) { + return Status::Invalid("Invalid or expired ", kSessionCookieName, " cookie."); + } else { + auto session = it->second; + *middleware = std::make_shared<ServerSessionMiddlewareImpl>( + this, incoming_headers, std::move(session), session_id); + } + } + + return Status::OK(); +} + +/// \brief Get a new, empty session option map and its id key. +std::pair<std::string, std::shared_ptr<FlightSqlSession>> +ServerSessionMiddlewareFactory::GetNewSession() { + std::string new_id = id_generator_(); Review Comment: ```suggestion auto new_id = id_generator_(); ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { + os << CloseSessionStatusNames[static_cast<int>(r)]; + return os; +} + +// Helpers for stringifying maps containing various types +std::ostream& operator<<(std::ostream& os, std::vector<std::string> v) { + os << '['; + std::string sep = ""; + for (const auto& x : v) { + os << sep << '"' << x << '"'; Review Comment: ```suggestion std::ostream& operator<<(std::ostream& os, std::vector<std::string> values) { os << '['; std::string sep = ""; for (const auto& v : values) { os << sep << '"' << v << '"'; ``` ########## cpp/src/arrow/flight/types.cc: ########## @@ -468,6 +468,320 @@ arrow::Result<CancelFlightInfoRequest> CancelFlightInfoRequest::Deserialize( return out; } +std::ostream& operator<<(std::ostream& os, const SetSessionOptionStatus& r) { + os << SetSessionOptionStatusNames[static_cast<int>(r)]; + return os; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& r) { + os << CloseSessionStatusNames[static_cast<int>(r)]; + return os; +} + +// Helpers for stringifying maps containing various types +std::ostream& operator<<(std::ostream& os, std::vector<std::string> v) { + os << '['; + std::string sep = ""; + for (const auto& x : v) { + os << sep << '"' << x << '"'; + sep = ", "; + } + os << ']'; + + return os; +} + +std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v) { + std::visit([&](const auto& x) { os << x; }, v); + return os; +} + +std::ostream& operator<<(std::ostream& os, const SetSessionOptionsResult::Result& r) { + os << '{' << r.status << '}'; + return os; +} + +template <typename T> +std::ostream& operator<<(std::ostream& os, std::map<std::string, T> m) { + os << '{'; + std::string sep = ""; + for (const auto& [k, v] : m) { + os << sep << '[' << k << "]: '" << v; Review Comment: Is the `'` quote balanced? ########## cpp/src/arrow/flight/sql/server_session_middleware.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Middleware for handling Flight SQL Sessions including session cookie handling. +// Currently experimental. + +#pragma once + +#include <functional> +#include <optional> +#include <shared_mutex> +#include <string_view> + +#include "arrow/flight/server_middleware.h" +#include "arrow/flight/sql/types.h" + +namespace arrow { +namespace flight { +namespace sql { + +static constexpr char const kSessionCookieName[] = "arrow_flight_session_id"; + +class ARROW_FLIGHT_SQL_EXPORT FlightSqlSession { + protected: + std::map<std::string, SessionOptionValue> map_; + std::shared_mutex map_lock_; + + public: + /// \brief Get session option by key + std::optional<SessionOptionValue> GetSessionOption(const std::string&); + /// \brief Set session option by key to given value + void SetSessionOption(const std::string& key, const SessionOptionValue v); + /// \brief Idempotently remove key from this call's Session, if Session & key exist + void EraseSessionOption(const std::string& key); +}; + +/// \brief A middleware to handle Session option persistence and related *Cookie headers. +class ARROW_FLIGHT_SQL_EXPORT ServerSessionMiddleware : public ServerMiddleware { + public: + static constexpr char const kMiddlewareName[] = + "arrow::flight::sql::ServerSessionMiddleware"; + + std::string name() const override { return kMiddlewareName; } + + /// \brief Is there an existing session (either existing or new) + virtual bool HasSession() const = 0; + /// \brief Get existing or new call-associated session + virtual std::shared_ptr<FlightSqlSession> GetSession() = 0; + /// \brief Get request headers, in lieu of a provided or created session. + virtual const CallHeaders& GetCallHeaders() const = 0; +}; + +/// \brief Returns a ServerMiddlewareFactory that handles Session option storage. Review Comment: ```suggestion /// \brief Returns a ServerMiddlewareFactory that handles session option storage. ``` ########## cpp/src/arrow/flight/sql/server_session_middleware.cc: ########## @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <mutex> + +#include "arrow/flight/sql/server_session_middleware.h" + +namespace arrow { +namespace flight { +namespace sql { + +/// \brief A factory for ServerSessionMiddleware, itself storing session data. +class ServerSessionMiddlewareFactory : public ServerMiddlewareFactory { + protected: + std::map<std::string, std::shared_ptr<FlightSqlSession>> session_store_; + std::shared_mutex session_store_lock_; + std::function<std::string()> id_generator_; + + std::vector<std::pair<std::string, std::string>> ParseCookieString( + const std::string_view& s); + + public: + explicit ServerSessionMiddlewareFactory(std::function<std::string()> id_gen) + : id_generator_(id_gen) {} + Status StartCall(const CallInfo&, const CallHeaders& incoming_headers, + std::shared_ptr<ServerMiddleware>* middleware); + + /// \brief Get a new, empty session option map and its id key. + std::pair<std::string, std::shared_ptr<FlightSqlSession>> GetNewSession(); +}; + +class ServerSessionMiddlewareImpl : public ServerSessionMiddleware { + protected: + std::shared_mutex lock_; + ServerSessionMiddlewareFactory* factory_; + const CallHeaders& headers_; + std::shared_ptr<FlightSqlSession> session_; + std::string session_id_; + const bool existing_session; Review Comment: ```suggestion const bool existing_session_; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org