wgtmac commented on code in PR #45459:
URL: https://github.com/apache/arrow/pull/45459#discussion_r2002351474


##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {

Review Comment:
   If all members are public, what about using a `struct`?



##########
cpp/src/parquet/types.cc:
##########
@@ -1619,6 +1658,193 @@ class LogicalType::Impl::Float16 final : public 
LogicalType::Impl::Incompatible,
 
 GENERATE_MAKE(Float16)
 
+class LogicalType::Impl::Geometry final : public 
LogicalType::Impl::Incompatible,
+                                          public 
LogicalType::Impl::SimpleApplicable {
+ public:
+  friend class GeometryLogicalType;
+
+  std::string ToString() const override;
+  std::string ToJSON() const override;
+  format::LogicalType ToThrift() const override;
+  bool Equals(const LogicalType& other) const override;
+
+  const std::string& crs() const { return crs_; }
+
+ private:
+  explicit Geometry(std::string crs)
+      : LogicalType::Impl(LogicalType::Type::GEOMETRY, SortOrder::UNKNOWN),
+        LogicalType::Impl::SimpleApplicable(parquet::Type::BYTE_ARRAY),
+        crs_(std::move(crs)) {}
+
+  std::string crs_;
+};
+
+std::string LogicalType::Impl::Geometry::ToString() const {
+  std::stringstream type;
+  type << "Geometry(crs=" << crs_ << ")";
+  return type.str();
+}
+
+std::string LogicalType::Impl::Geometry::ToJSON() const {
+  std::stringstream json;
+  json << R"({"Type": "Geometry")";
+
+  if (!crs_.empty()) {
+    // TODO(paleolimbot): For documented cases the CRS shouldn't contain 
quotes,
+    // but we should probably escape the value of crs_ for backslash and quotes
+    // to be safe.
+    json << R"(, "crs": ")" << crs_ << R"(")";
+  }
+
+  json << "}";
+  return json.str();
+}
+
+format::LogicalType LogicalType::Impl::Geometry::ToThrift() const {
+  format::LogicalType type;
+  format::GeometryType geometry_type;
+
+  // Canonially export crs of "" as an unset CRS
+  if (!crs_.empty()) {
+    geometry_type.__set_crs(crs_);
+  }
+
+  type.__set_GEOMETRY(geometry_type);
+  return type;
+}
+
+bool LogicalType::Impl::Geometry::Equals(const LogicalType& other) const {
+  if (other.is_geometry()) {
+    const auto& other_geometry = checked_cast<const 
GeometryLogicalType&>(other);
+    return crs() == other_geometry.crs();
+  } else {
+    return false;
+  }
+}
+
+const std::string& GeometryLogicalType::crs() const {
+  return (dynamic_cast<const LogicalType::Impl::Geometry&>(*impl_)).crs();
+}
+
+std::shared_ptr<const LogicalType> GeometryLogicalType::Make(std::string crs) {
+  auto* logical_type = new GeometryLogicalType();
+  logical_type->impl_.reset(new LogicalType::Impl::Geometry(std::move(crs)));
+  return std::shared_ptr<const LogicalType>(logical_type);

Review Comment:
   ```suggestion
     auto logical_type = std::shared_ptr<GeometryLogicalType>(new 
GeometryLogicalType());
     logical_type->impl_.reset(new LogicalType::Impl::Geometry(std::move(crs)));
     return logical_type;
   ```
   
   I know it is for consistency. Is it better to construct it as above?



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();

Review Comment:
   ```suggestion
       return !geospatial_types.empty() || (has_x() && has_y());
   ```
   
   Should it be the above?



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();
+  }
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count);

Review Comment:
   Why do we need a `null_count` here?



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();
+  }
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered, empty encoded statistics are returned
+  /// (such that is_set() returns false and they should not be written).
+  EncodedGeoStatistics Encode() const;
+
+  /// \brief Returns true if all WKB encountered was valid or false otherwise
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief The minimum encountered value in the X dimension, or Inf if no X 
values were
+  /// encountered.
+  ///
+  /// The Parquet definition allows for "wrap around" bounds where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in files written by other readers.

Review Comment:
   ```suggestion
     /// be encountered in files written by other writers.
   ```



##########
cpp/src/parquet/geospatial_util_internal.h:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <limits>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+
+#include "arrow/util/logging.h"
+#include "parquet/platform.h"
+
+namespace parquet::geometry {
+
+/// \brief Infinity, used to define bounds of empty bounding boxes
+constexpr double kInf = std::numeric_limits<double>::infinity();
+
+/// \brief Valid combinations of dimensions allowed by ISO well-known binary
+///
+/// These values correspond to the 0, 1000, 2000, 3000 component of the WKB 
integer
+/// geometry type (i.e., the value of geometry_type // 1000).
+enum class Dimensions {
+  kXY = 0,
+  kXYZ = 1,
+  kXYM = 2,
+  kXYZM = 3,
+  kWKBValueMin = 0,
+  kWKBValueMax = 3
+};
+
+/// \brief The supported set of geometry types allowed by ISO well-known binary
+///
+/// These values correspond to the 1, 2, ..., 7 component of the WKB integer
+/// geometry type (i.e., the value of geometry_type % 1000).
+enum class GeometryType {
+  kPoint = 1,
+  kLinestring = 2,
+  kPolygon = 3,
+  kMultiPoint = 4,
+  kMultiLinestring = 5,
+  kMultiPolygon = 6,
+  kGeometryCollection = 7,
+  kWKBValueMin = 1,
+  kWKBValueMax = 7
+};
+
+/// \brief A collection of intervals representing the encountered ranges of 
values
+/// in each dimension.
+///
+/// The Parquet specification also supports wraparound bounding boxes in the X 
and Y
+/// dimensions; however, this structure assumes min < max always as it is used 
for
+/// the purposes of accumulating this type of bounds.
+struct BoundingBox {
+  using XY = std::array<double, 2>;
+  using XYZ = std::array<double, 3>;
+  using XYM = std::array<double, 3>;
+  using XYZM = std::array<double, 4>;
+
+  BoundingBox(const XYZM& mins, const XYZM& maxes) : min(mins), max(maxes) {}
+  BoundingBox() : min{kInf, kInf, kInf, kInf}, max{-kInf, -kInf, -kInf, -kInf} 
{}
+
+  BoundingBox(const BoundingBox& other) = default;
+  BoundingBox& operator=(const BoundingBox&) = default;
+
+  /// \brief Update the X and Y bounds to ensure these bounds contain coord
+  void UpdateXY(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 2);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Update the X, Y, and Z bounds to ensure these bounds contain coord
+  void UpdateXYZ(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 3);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Update the X, Y, and M bounds to ensure these bounds contain coord
+  void UpdateXYM(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 3);
+    min[0] = std::min(min[0], coord[0]);
+    min[1] = std::min(min[1], coord[1]);
+    min[3] = std::min(min[3], coord[2]);
+    max[0] = std::max(max[0], coord[0]);
+    max[1] = std::max(max[1], coord[1]);
+    max[3] = std::max(max[3], coord[2]);
+  }
+
+  /// \brief Update the X, Y, Z, and M bounds to ensure these bounds contain 
coord
+  void UpdateXYZM(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 4);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Reset these bounds to an empty state such that they contain no 
coordinates
+  void Reset() {
+    for (int i = 0; i < 4; i++) {
+      min[i] = kInf;
+      max[i] = -kInf;
+    }
+  }
+
+  /// \brief Update these bounds such they also contain other
+  void Merge(const BoundingBox& other) {
+    for (int i = 0; i < 4; i++) {
+      min[i] = std::min(min[i], other.min[i]);
+      max[i] = std::max(max[i], other.max[i]);
+    }
+  }
+
+  std::string ToString() const {
+    std::stringstream ss;
+    ss << "BoundingBox" << std::endl;
+    ss << "  x: [" << min[0] << ", " << max[0] << "]" << std::endl;
+    ss << "  y: [" << min[1] << ", " << max[1] << "]" << std::endl;
+    ss << "  z: [" << min[2] << ", " << max[2] << "]" << std::endl;
+    ss << "  m: [" << min[3] << ", " << max[3] << "]" << std::endl;
+
+    return ss.str();
+  }
+
+  XYZM min;
+  XYZM max;
+
+ private:
+  // This works for XY, XYZ, and XYZM
+  template <typename Coord>
+  void UpdateInternal(Coord coord) {
+    for (size_t i = 0; i < coord.size(); i++) {
+      min[i] = std::min(min[i], coord[i]);
+      max[i] = std::max(max[i], coord[i]);
+    }
+  }
+};
+
+inline bool operator==(const BoundingBox& lhs, const BoundingBox& rhs) {
+  return lhs.min == rhs.min && lhs.max == rhs.max;
+}
+
+inline bool operator!=(const BoundingBox& lhs, const BoundingBox& rhs) {
+  return !(lhs == rhs);
+}
+
+class WKBBuffer;
+
+/// \brief Accumulate a BoundingBox and geometry types based on zero or more 
well-known
+/// binary blobs
+///
+/// Note that this class is NOT appropriate for bounding a GEOGRAPHY,
+/// whose bounds are not a function purely of the vertices. Geography bounding
+/// is not yet implemented.
+class PARQUET_EXPORT WKBGeometryBounder {
+ public:
+  /// \brief Accumulate the bounds of a serialized well-known binary geometry
+  ///
+  /// Returns SerializationError for any parse errors encountered. Bounds for
+  /// any encountered coordinates are accumulated and the geometry type of
+  /// the geometry is added to the internal geometry type list.
+  ::arrow::Status ReadGeometry(std::string_view bytes_wkb);

Review Comment:
   Instead of `ReadXXX`, should we call them `MergeXXX|AddXXX` to be explicit?



##########
cpp/src/parquet/geospatial_util_internal.cc:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_util_internal.h"
+
+#include "arrow/result.h"
+#include "arrow/util/endian.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/ubsan.h"
+
+namespace parquet::geometry {
+
+/// \brief Object to keep track of the low-level consumption of a well-known 
binary
+/// geometry
+///
+/// Briefly, ISO well-known binary supported by the Parquet spec is an endian 
byte
+/// (0x01 or 0x00), followed by geometry type + dimensions encoded as a 
(uint32_t),
+/// followed by geometry-specific data. Coordinate sequences are represented 
by a
+/// uint32_t (the number of coordinates) plus a sequence of doubles (number of 
coordinates
+/// multiplied by the number of dimensions).
+class WKBBuffer {
+ public:
+  WKBBuffer() : data_(NULLPTR), size_(0) {}
+  WKBBuffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
+
+  void Init(const uint8_t* data, int64_t size) {

Review Comment:
   Why do we need the default ctor and `Init`? For reuse?



##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1227,10 +1239,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     // Will be null if not using dictionary, but that's ok
     current_dict_encoder_ = 
dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
 
-    if (properties->statistics_enabled(descr_->path()) &&
-        (SortOrder::UNKNOWN != descr_->sort_order())) {
-      page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
-      chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
+    if (properties->statistics_enabled(descr_->path())) {
+      if (SortOrder::UNKNOWN != descr_->sort_order()) {

Review Comment:
   Since geo types have SortOrder::UNKNOWN, this also disables stats like 
value_count, null_count for them. However, SortOrder only affects the 
lower/upper bounds. Perhaps we need to add a comment here to improve.



##########
cpp/src/parquet/arrow/schema.cc:
##########
@@ -1088,6 +1114,10 @@ Status ToParquetSchema(const ::arrow::Schema* 
arrow_schema,
                        const WriterProperties& properties,
                        const ArrowWriterProperties& arrow_properties,
                        std::shared_ptr<SchemaDescriptor>* out) {
+  // TODO(paleolimbot): I'm wondering if this geo_crs_context is reused when 
testing
+  // on MINGW, where we get some failures indicating non-empty metadata where 
it was
+  // expected
+  arrow_properties.geo_crs_context()->Clear();

Review Comment:
   It is weird to call `Clear()` on a const reference. Why doing this?



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -526,9 +526,17 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
 Status GetSchemaMetadata(const ::arrow::Schema& schema, ::arrow::MemoryPool* 
pool,
                          const ArrowWriterProperties& properties,
                          std::shared_ptr<const KeyValueMetadata>* out) {
-  if (!properties.store_schema()) {
+  if (!properties.store_schema() &&
+      !properties.geo_crs_context()->HasProjjsonCrsFields()) {

Review Comment:
   Can we enforce `store_schema` to be true (or throw when it is false) when 
`HasProjjsonCrsFields()` is true? This is easier to maintain the code.



##########
cpp/src/parquet/types.cc:
##########
@@ -1619,6 +1668,204 @@ class LogicalType::Impl::Float16 final : public 
LogicalType::Impl::Incompatible,
 
 GENERATE_MAKE(Float16)
 
+class LogicalType::Impl::Geometry final : public 
LogicalType::Impl::Incompatible,
+                                          public 
LogicalType::Impl::SimpleApplicable {
+ public:
+  friend class GeometryLogicalType;
+
+  std::string ToString() const override;
+  std::string ToJSON() const override;
+  format::LogicalType ToThrift() const override;
+  bool Equals(const LogicalType& other) const override;
+
+  const std::string& crs() const { return crs_; }
+
+ private:
+  explicit Geometry(std::string crs)
+      : LogicalType::Impl(LogicalType::Type::GEOMETRY, SortOrder::UNKNOWN),
+        LogicalType::Impl::SimpleApplicable(parquet::Type::BYTE_ARRAY),
+        crs_(std::move(crs)) {}
+
+  std::string crs_;
+};
+
+std::string LogicalType::Impl::Geometry::ToString() const {
+  std::stringstream type;
+  type << "Geometry(crs=" << crs_ << ")";
+  return type.str();
+}
+
+std::string LogicalType::Impl::Geometry::ToJSON() const {
+  std::stringstream json;
+  json << R"({"Type": "Geometry")";
+
+  if (!crs_.empty()) {
+    // TODO(paleolimbot): For documented cases the CRS shouldn't contain 
quotes,
+    // but we should probably escape the value of crs_ for backslash and quotes
+    // to be safe.
+    json << R"(, "crs": ")" << crs_ << R"(")";
+  }
+
+  json << "}";
+  return json.str();
+}
+
+format::LogicalType LogicalType::Impl::Geometry::ToThrift() const {
+  format::LogicalType type;
+  format::GeometryType geometry_type;
+
+  // Canonially export crs of "" as an unset CRS
+  if (!crs_.empty()) {
+    geometry_type.__set_crs(crs_);
+  }
+
+  type.__set_GEOMETRY(geometry_type);
+  return type;
+}
+
+bool LogicalType::Impl::Geometry::Equals(const LogicalType& other) const {
+  if (other.is_geometry()) {
+    const auto& other_geometry = checked_cast<const 
GeometryLogicalType&>(other);
+    return crs() == other_geometry.crs();
+  } else {
+    return false;
+  }
+}
+
+const std::string& GeometryLogicalType::crs() const {
+  return (dynamic_cast<const LogicalType::Impl::Geometry&>(*impl_)).crs();

Review Comment:
   I'm inclined to use `checked_cast` here. Perhaps creating an issue for each 
kind of inconsistency?



##########
cpp/src/parquet/properties.h:
##########
@@ -69,6 +69,45 @@ constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 
1000;
 // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
 constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
 
+/// \brief Convert Coordinate Reference System values for GEOMETRY and 
GEOGRAPHY logical
+/// types
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoCrsContext {
+ public:
+  virtual ~GeoCrsContext() = default;
+
+  /// \brief Remove any previously saved PROJJSON CRS fields
+  virtual void Clear() = 0;
+
+  /// \brief Given a coordinate reference system value and encoding from 
GeoArrow
+  /// extension metadata, return the value that should be placed in the
+  /// LogicalType::Geography|Geometry(crs=) field
+  ///
+  /// For PROJJSON Crses (the most common way coordinate reference systems 
arrive
+  /// in Arrow), the Parquet specification forces us to write them to the file
+  /// metadata. The default GeoCrsContext will record such values and return 
the required
+  /// string that can be placed into the 'crs' of the Geometry or Geography 
logical
+  /// type.
+  virtual std::string GetParquetCrs(std::string crs_value,
+                                    const std::string& crs_encoding) = 0;
+
+  /// \brief Returns true if any converted CRS values were PROJJSON whose 
values are
+  /// stored in this object and should be written to the file metadata
+  virtual bool HasProjjsonCrsFields() = 0;
+
+  /// \brief Add any stored PROJJSON values to the supplied file 
KeyValueMetadata
+  ///
+  /// The default implementation will add any PROJJSON values it encountered 
to the file
+  /// KeyValueMetadata; however, a custom implementation may choose to store 
these values
+  /// somewhere else and implement this method as a no-op.
+  virtual void AddProjjsonCrsFieldsToFileMetadata(

Review Comment:
   ```suggestion
     virtual void AddProjjsonCrsFieldsToMetadata(
   ```
   
   It is not necessary to be File.



##########
cpp/src/parquet/geospatial_util_internal.h:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <limits>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+
+#include "arrow/util/logging.h"
+#include "parquet/platform.h"
+
+namespace parquet::geometry {
+
+/// \brief Infinity, used to define bounds of empty bounding boxes
+constexpr double kInf = std::numeric_limits<double>::infinity();
+
+/// \brief Valid combinations of dimensions allowed by ISO well-known binary
+///
+/// These values correspond to the 0, 1000, 2000, 3000 component of the WKB 
integer
+/// geometry type (i.e., the value of geometry_type // 1000).
+enum class Dimensions {
+  kXY = 0,
+  kXYZ = 1,
+  kXYM = 2,
+  kXYZM = 3,
+  kWKBValueMin = 0,

Review Comment:
   Why `kWKBValueMin/kWKBValueMax` but not `kDimValueMin/kDimValueMax` or 
simply `kValueMin/kValueMax`?



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();
+  }
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered, empty encoded statistics are returned
+  /// (such that is_set() returns false and they should not be written).
+  EncodedGeoStatistics Encode() const;
+
+  /// \brief Returns true if all WKB encountered was valid or false otherwise
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief The minimum encountered value in the X dimension, or Inf if no X 
values were
+  /// encountered.
+  ///
+  /// The Parquet definition allows for "wrap around" bounds where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in files written by other readers.
+  double get_xmin() const;

Review Comment:
   ```suggestion
     double xmin() const;
   ```
   
   We don't really use `get_xxx()` in parquet. Same for below.



##########
cpp/src/parquet/geospatial_util_internal_json.h:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/util/key_value_metadata.h"
+
+#include "parquet/properties.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Compute a Parquet Logical type (Geometry(...) or Geography(...)) 
from
+/// serialized GeoArrow metadata
+///
+/// Returns the appropriate LogicalType, SerializationError if the metadata 
was invalid,
+/// or NotImplemented if Parquet was not built with ARROW_JSON.
+::arrow::Result<std::shared_ptr<const LogicalType>> 
GeospatialLogicalTypeFromGeoArrowJSON(
+    const std::string& serialized_data, const ArrowWriterProperties& 
arrow_properties);
+
+/// \brief Compute a suitable DataType into which a GEOMETRY or GEOGRAPHY type 
should be
+/// read
+///
+/// The result of this function depends on whether or not "geoarrow.wkb" has 
been
+/// registered: if it has, the result will be the registered ExtensionType; if 
it has not,
+/// the result will be binary().
+::arrow::Result<std::shared_ptr<::arrow::DataType>> MakeGeoArrowGeometryType(
+    const LogicalType& logical_type,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata);
+
+/// \brief The default GeoCrsContext, which writes any PROJJSON coordinate 
reference
+/// system values it encounters to the file metadata
+class PARQUET_EXPORT FileGeoCrsContext : public GeoCrsContext {
+ public:
+  FileGeoCrsContext() { Clear(); }
+
+  void Clear() override {
+    projjson_crs_fields_ = ::arrow::KeyValueMetadata::Make({}, {});
+  }
+
+  std::string GetParquetCrs(std::string crs_value,
+                            const std::string& crs_encoding) override;
+
+  bool HasProjjsonCrsFields() override { return projjson_crs_fields_->size() > 
0; }
+
+  void AddProjjsonCrsFieldsToFileMetadata(::arrow::KeyValueMetadata* metadata) 
override;
+
+ private:
+  std::shared_ptr<::arrow::KeyValueMetadata> projjson_crs_fields_;

Review Comment:
   Why not use a simple std::map or std::unordered_map ?



##########
cpp/src/parquet/geospatial_statistics.cc:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_statistics.h"
+#include <cmath>
+#include <memory>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial_util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    if (is_valid_ != other.is_valid_) {
+      return false;
+    }
+
+    if (!is_valid_ && !other.is_valid_) {
+      return true;
+    }
+
+    if (bounder_.GeometryTypes() != other.bounder_.GeometryTypes()) {
+      return false;
+    }
+
+    if (bounder_.Bounds() != other.bounder_.Bounds()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || is_wraparound_y() || other.is_wraparound_x() ||
+        other.is_wraparound_y()) {
+      throw ParquetException(
+          "Wraparound X or Y is not suppored by GeoStatistics::Merge()");
+    }
+
+    bounder_.ReadBox(other.bounder_.Bounds());
+    std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
+    bounder_.ReadGeometryTypes(other_geometry_types);
+  }
+
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count) 
{
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || is_wraparound_y()) {
+      throw ParquetException(
+          "Wraparound X or Y is not suppored by GeoStatistics::Update()");
+    }
+
+    for (int64_t i = 0; i < num_values; i++) {
+      const ByteArray& item = values[i];
+      ::arrow::Status status =
+          bounder_.ReadGeometry({reinterpret_cast<const char*>(item.ptr), 
item.len});
+      if (!status.ok()) {
+        is_valid_ = false;
+        return;
+      }
+    }
+  }
+
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values, int64_t null_count) {

Review Comment:
   `null_count` is unused



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();
+  }
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered, empty encoded statistics are returned
+  /// (such that is_set() returns false and they should not be written).
+  EncodedGeoStatistics Encode() const;
+
+  /// \brief Returns true if all WKB encountered was valid or false otherwise
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief The minimum encountered value in the X dimension, or Inf if no X 
values were
+  /// encountered.
+  ///
+  /// The Parquet definition allows for "wrap around" bounds where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in files written by other readers.
+  double get_xmin() const;
+
+  /// \brief The maximum encountered value in the X dimension, or -Inf if no X 
values were

Review Comment:
   Will `xmax` return `Inf`? How do we handle `NaN`? It would be good to 
document these behaviors as well.



##########
cpp/src/parquet/geospatial_util_internal.cc:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_util_internal.h"
+
+#include "arrow/result.h"
+#include "arrow/util/endian.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/ubsan.h"
+
+namespace parquet::geometry {
+
+/// \brief Object to keep track of the low-level consumption of a well-known 
binary
+/// geometry
+///
+/// Briefly, ISO well-known binary supported by the Parquet spec is an endian 
byte
+/// (0x01 or 0x00), followed by geometry type + dimensions encoded as a 
(uint32_t),
+/// followed by geometry-specific data. Coordinate sequences are represented 
by a
+/// uint32_t (the number of coordinates) plus a sequence of doubles (number of 
coordinates
+/// multiplied by the number of dimensions).
+class WKBBuffer {
+ public:
+  WKBBuffer() : data_(NULLPTR), size_(0) {}
+  WKBBuffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
+
+  void Init(const uint8_t* data, int64_t size) {
+    data_ = data;
+    size_ = size;
+  }
+
+  ::arrow::Result<uint8_t> ReadUInt8() { return ReadChecked<uint8_t>(); }
+
+  ::arrow::Result<uint32_t> ReadUInt32(bool swap) {
+    ARROW_ASSIGN_OR_RAISE(auto value, ReadChecked<uint32_t>());
+    if (ARROW_PREDICT_FALSE(swap)) {
+      return ByteSwap(value);
+    } else {
+      return value;
+    }
+  }
+
+  template <typename Coord, typename Visit>
+  ::arrow::Status ReadDoubles(uint32_t n_coords, bool swap, Visit&& visit) {
+    size_t total_bytes = n_coords * sizeof(Coord);
+    if (size_ < total_bytes) {
+      return ::arrow::Status::SerializationError(
+          "Can't coordinate sequence of ", total_bytes, " bytes from WKBBuffer 
with ",
+          size_, " remaining");
+    }
+
+    if (ARROW_PREDICT_FALSE(swap)) {
+      Coord coord;
+      for (uint32_t i = 0; i < n_coords; i++) {
+        coord = ReadUnchecked<Coord>();
+        for (uint32_t j = 0; j < coord.size(); j++) {
+          coord[j] = ByteSwap(coord[j]);
+        }
+
+        visit(coord);
+      }
+    } else {
+      for (uint32_t i = 0; i < n_coords; i++) {
+        visit(ReadUnchecked<Coord>());
+      }
+    }
+
+    return ::arrow::Status::OK();
+  }
+
+  size_t size() { return size_; }
+
+ private:
+  const uint8_t* data_;
+  size_t size_;
+
+  template <typename T>
+  ::arrow::Result<T> ReadChecked() {
+    if (size_ < sizeof(T)) {
+      return ::arrow::Status::SerializationError(
+          "Can't read ", sizeof(T), " bytes from WKBBuffer with ", size_, " 
remaining");
+    }
+
+    return ReadUnchecked<T>();
+  }
+
+  template <typename T>
+  T ReadUnchecked() {
+    T out = ::arrow::util::SafeLoadAs<T>(data_);
+    data_ += sizeof(T);
+    size_ -= sizeof(T);
+    return out;
+  }
+
+  template <typename T>
+  T ByteSwap(T value) {
+    return ::arrow::bit_util::ByteSwap(value);
+  }
+};
+
+using GeometryTypeAndDimensions = std::pair<GeometryType, Dimensions>;
+
+namespace {
+
+::arrow::Result<GeometryTypeAndDimensions> ParseGeometryType(uint32_t 
wkb_geometry_type) {
+  // The number 1000 can be used because WKB geometry types are constructed
+  // on purpose such that this relationship is true (e.g., LINESTRING ZM maps
+  // to 3002).
+  uint32_t geometry_type_component = wkb_geometry_type % 1000;
+  uint32_t dimensions_component = wkb_geometry_type / 1000;
+
+  auto min_geometry_type_value = 
static_cast<uint32_t>(GeometryType::kWKBValueMin);
+  auto max_geometry_type_value = 
static_cast<uint32_t>(GeometryType::kWKBValueMax);
+  auto min_dimension_value = static_cast<uint32_t>(Dimensions::kWKBValueMin);
+  auto max_dimension_value = static_cast<uint32_t>(Dimensions::kWKBValueMax);
+
+  if (geometry_type_component < min_geometry_type_value ||
+      geometry_type_component > max_geometry_type_value ||
+      dimensions_component < min_dimension_value ||
+      dimensions_component > max_dimension_value) {
+    return ::arrow::Status::SerializationError("Invalid WKB geometry type: ",
+                                               wkb_geometry_type);
+  }
+
+  GeometryTypeAndDimensions 
out{static_cast<GeometryType>(geometry_type_component),
+                                static_cast<Dimensions>(dimensions_component)};
+  return out;
+}
+
+}  // namespace
+
+std::vector<int32_t> WKBGeometryBounder::GeometryTypes() const {
+  std::vector<int32_t> out(geospatial_types_.begin(), geospatial_types_.end());
+  std::sort(out.begin(), out.end());
+  return out;
+}
+
+::arrow::Status WKBGeometryBounder::ReadGeometry(std::string_view bytes_wkb) {
+  WKBBuffer src{reinterpret_cast<const uint8_t*>(bytes_wkb.data()),
+                static_cast<int64_t>(bytes_wkb.size())};
+  ARROW_RETURN_NOT_OK(ReadGeometryInternal(&src, /*record_wkb_type=*/true));
+  if (src.size() != 0) {
+    return ::arrow::Status::SerializationError(
+        "Exepcted zero bytes after consuming WKB but got ", src.size());
+  }
+
+  return ::arrow::Status::OK();
+}
+
+::arrow::Status WKBGeometryBounder::ReadGeometryInternal(WKBBuffer* src,
+                                                         bool record_wkb_type) 
{
+  ARROW_ASSIGN_OR_RAISE(uint8_t endian, src->ReadUInt8());
+#if defined(ARROW_LITTLE_ENDIAN)
+  bool swap = endian != 0x01;
+#else
+  bool swap = endian != 0x00;
+#endif
+
+  ARROW_ASSIGN_OR_RAISE(uint32_t wkb_geometry_type, src->ReadUInt32(swap));
+  ARROW_ASSIGN_OR_RAISE(auto geometry_type_and_dimensions,
+                        ParseGeometryType(wkb_geometry_type));
+
+  // Keep track of geometry types encountered if at the top level
+  if (record_wkb_type) {
+    geospatial_types_.insert(static_cast<int32_t>(wkb_geometry_type));
+  }
+
+  switch (geometry_type_and_dimensions.first) {
+    case GeometryType::kPoint:
+      ARROW_RETURN_NOT_OK(
+          ReadSequence(src, geometry_type_and_dimensions.second, 1, swap));
+      break;
+
+    case GeometryType::kLinestring: {
+      ARROW_ASSIGN_OR_RAISE(uint32_t n_coords, src->ReadUInt32(swap));
+      ARROW_RETURN_NOT_OK(
+          ReadSequence(src, geometry_type_and_dimensions.second, n_coords, 
swap));
+      break;
+    }
+    case GeometryType::kPolygon: {
+      ARROW_ASSIGN_OR_RAISE(uint32_t n_parts, src->ReadUInt32(swap));
+      for (uint32_t i = 0; i < n_parts; i++) {
+        ARROW_ASSIGN_OR_RAISE(uint32_t n_coords, src->ReadUInt32(swap));
+        ARROW_RETURN_NOT_OK(
+            ReadSequence(src, geometry_type_and_dimensions.second, n_coords, 
swap));
+      }
+      break;
+    }
+
+    // These are all encoded the same in WKB, even though this encoding would
+    // allow for parts to be of a different geometry type or different 
dimensions.
+    // For the purposes of bounding, this does not cause us problems.
+    case GeometryType::kMultiPoint:
+    case GeometryType::kMultiLinestring:
+    case GeometryType::kMultiPolygon:
+    case GeometryType::kGeometryCollection: {
+      ARROW_ASSIGN_OR_RAISE(uint32_t n_parts, src->ReadUInt32(swap));
+      for (uint32_t i = 0; i < n_parts; i++) {
+        ARROW_RETURN_NOT_OK(ReadGeometryInternal(src, /*record_wkb_type*/ 
false));

Review Comment:
   If `record_wkb_type` is false, we are losing track of these encountered 
geometry types. Does it obviates the spec which requests to clear the list in 
this case? Why not manually add the GeometryType?



##########
cpp/src/parquet/geospatial_statistics.cc:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_statistics.h"
+#include <cmath>
+#include <memory>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial_util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {

Review Comment:
   Why not `return is_valid == other.is_valid_ && bounder_.GeometryTypes() == 
other.bounder_.GeometryTypes() && bounder_.Bounds() == 
other.bounder_.Bounds();` ?



##########
cpp/src/parquet/geospatial_statistics.cc:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_statistics.h"
+#include <cmath>
+#include <memory>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial_util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    if (is_valid_ != other.is_valid_) {
+      return false;
+    }
+
+    if (!is_valid_ && !other.is_valid_) {
+      return true;
+    }
+
+    if (bounder_.GeometryTypes() != other.bounder_.GeometryTypes()) {
+      return false;
+    }
+
+    if (bounder_.Bounds() != other.bounder_.Bounds()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || is_wraparound_y() || other.is_wraparound_x() ||
+        other.is_wraparound_y()) {
+      throw ParquetException(
+          "Wraparound X or Y is not suppored by GeoStatistics::Merge()");
+    }
+
+    bounder_.ReadBox(other.bounder_.Bounds());
+    std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
+    bounder_.ReadGeometryTypes(other_geometry_types);
+  }
+
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count) 
{

Review Comment:
   `null_count` is useless here.



##########
cpp/src/parquet/geospatial_statistics_test.cc:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/array/builder_binary.h"
+#include "arrow/compute/api.h"
+#include "arrow/testing/gtest_util.h"
+
+#include "parquet/geospatial_statistics.h"
+#include "parquet/test_util.h"
+
+namespace parquet::geometry {
+
+TEST(TestGeoStatistics, TestDefaults) {
+  GeoStatistics stats;
+  EXPECT_EQ(stats.get_geometry_types().size(), 0);
+  EXPECT_TRUE(stats.is_valid());
+  EXPECT_TRUE(stats.is_empty());
+  EXPECT_FALSE(stats.has_z());
+  EXPECT_FALSE(stats.has_m());
+  EXPECT_EQ(stats.get_xmax() - stats.get_xmin(), -kInf);
+  EXPECT_EQ(stats.get_ymax() - stats.get_ymin(), -kInf);
+  EXPECT_EQ(stats.get_zmax() - stats.get_zmin(), -kInf);
+  EXPECT_EQ(stats.get_mmax() - stats.get_mmin(), -kInf);
+  EXPECT_TRUE(stats.Equals(GeoStatistics()));
+
+  EXPECT_EQ(stats.ToString(),
+            "GeoStatistics \n  x: [inf, -inf]\n  y: [inf, -inf]\n  
geometry_types:\n");
+
+  auto encoded = stats.Encode();
+  EXPECT_FALSE(encoded.is_set());
+  EXPECT_FALSE(encoded.has_z());
+  EXPECT_FALSE(encoded.has_m());
+  EXPECT_TRUE(GeoStatistics(encoded).Equals(stats));
+
+  stats.Merge(GeoStatistics());
+  EXPECT_TRUE(GeoStatistics(encoded).Equals(stats));
+}
+
+TEST(TestGeoStatistics, TestUpdateByteArray) {
+  GeoStatistics stats;
+
+  // Make items with all dimensions to ensure all dimensions are updated
+  // and returned properly (POINT XYZM has an integer code of 3001)
+  std::string xyzm0 = test::MakeWKBPoint({10, 11, 12, 13}, true, true);
+  ByteArray item0{xyzm0};
+
+  stats.Update(&item0, /*num_values=*/1, /*null_count=*/0);
+  EXPECT_TRUE(stats.is_valid());
+  EXPECT_THAT(stats.get_lower_bound(), ::testing::ElementsAre(10, 11, 12, 13));
+  EXPECT_THAT(stats.get_upper_bound(), ::testing::ElementsAre(10, 11, 12, 13));
+  EXPECT_THAT(stats.get_geometry_types(), ::testing::ElementsAre(3001));
+
+  std::string xyzm1 = test::MakeWKBPoint({20, 21, 22, 23}, true, true);
+  ByteArray item1{xyzm1};
+
+  stats.Update(&item1, /*num_values=*/1, /*null_count=*/0);
+  EXPECT_TRUE(stats.is_valid());
+  EXPECT_THAT(stats.get_lower_bound(), ::testing::ElementsAre(10, 11, 12, 13));
+  EXPECT_THAT(stats.get_upper_bound(), ::testing::ElementsAre(20, 21, 22, 23));
+  EXPECT_THAT(stats.get_geometry_types(), ::testing::ElementsAre(3001));
+
+  // Check recreating the statistics with actual values
+  auto encoded = stats.Encode();
+  EXPECT_TRUE(GeoStatistics(encoded).Equals(stats));
+  EXPECT_EQ(stats.ToString(),
+            "GeoStatistics \n  x: [10, 20]\n  y: [11, 21]\n  z: [12, 22]\n  m: 
"
+            "[13, 23]\n  geometry_types: 3001\n");
+
+  // Check resetting to the original state
+  stats.Reset();
+  EXPECT_TRUE(stats.Equals(GeoStatistics()));
+
+  // Check UpdateSpaced()
+
+  // A null value that should be skipped
+  std::string xyzm2 = test::MakeWKBPoint({-30, -31, -32, -33}, true, true);
+  ByteArray item2{xyzm2};
+
+  // A non-null value that shouldn't be skipped
+  std::string xyzm3 = test::MakeWKBPoint({30, 31, 32, 33}, true, true);
+  ByteArray item3{xyzm3};
+
+  ByteArray items[] = {item0, item1, item2, item3};
+  // Validity bitmap with an extra bit on the front to check non-zero bits 
offset
+  uint8_t validity = 0b00010111;
+  GeoStatistics stats_spaced;
+  stats_spaced.UpdateSpaced(items, &validity, 1, 4, 4, 1);
+
+  EXPECT_TRUE(stats.is_valid());
+  EXPECT_THAT(stats_spaced.get_lower_bound(), ::testing::ElementsAre(10, 11, 
12, 13));
+  EXPECT_THAT(stats_spaced.get_upper_bound(), ::testing::ElementsAre(30, 31, 
32, 33));
+  EXPECT_THAT(stats_spaced.get_geometry_types(), ::testing::ElementsAre(3001));

Review Comment:
   It would be good to verify `has_z()` and `has_m()`. BTW, is it possible to 
add cases for `xyz` and `xym`?



##########
cpp/src/parquet/geospatial_statistics.cc:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_statistics.h"
+#include <cmath>
+#include <memory>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial_util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    if (is_valid_ != other.is_valid_) {
+      return false;
+    }
+
+    if (!is_valid_ && !other.is_valid_) {
+      return true;
+    }
+
+    if (bounder_.GeometryTypes() != other.bounder_.GeometryTypes()) {
+      return false;
+    }
+
+    if (bounder_.Bounds() != other.bounder_.Bounds()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || is_wraparound_y() || other.is_wraparound_x() ||
+        other.is_wraparound_y()) {
+      throw ParquetException(
+          "Wraparound X or Y is not suppored by GeoStatistics::Merge()");

Review Comment:
   ```suggestion
             "Wraparound X or Y is not supported by GeoStatistics::Merge()");
   ```



##########
cpp/src/parquet/properties.h:
##########
@@ -1112,6 +1152,13 @@ class PARQUET_EXPORT ArrowWriterProperties {
       return this;
     }
 
+    /// \brief Set the GeoCrsContext with which coordinate reference systems 
should be
+    /// interpreted for GEOMETRY and GEOGRAPHY types
+    Builder* set_geospatial_crs_context(std::shared_ptr<GeoCrsContext> 
geo_crs_context) {
+      geo_crs_context_ = geo_crs_context;

Review Comment:
   ```suggestion
         geo_crs_context_ = std::move(geo_crs_context);
   ```



##########
cpp/src/parquet/geospatial_statistics.h:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+class PARQUET_EXPORT EncodedGeoStatistics {
+ public:
+  static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+  double xmin{kInf};
+  double xmax{-kInf};
+  double ymin{kInf};
+  double ymax{-kInf};
+  double zmin{kInf};
+  double zmax{-kInf};
+  double mmin{kInf};
+  double mmax{-kInf};
+  std::vector<int32_t> geospatial_types;
+
+  bool has_x() const { return !std::isinf(xmin - xmax); }
+  bool has_y() const { return !std::isinf(ymin - ymax); }
+  bool has_z() const { return !std::isinf(zmin - zmax); }
+  bool has_m() const { return !std::isinf(mmin - mmax); }
+
+  bool is_set() const {
+    return !geospatial_types.empty() || has_x() || has_y() || has_z() || 
has_m();
+  }
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values, int64_t null_count);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered, empty encoded statistics are returned
+  /// (such that is_set() returns false and they should not be written).
+  EncodedGeoStatistics Encode() const;
+
+  /// \brief Returns true if all WKB encountered was valid or false otherwise
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief The minimum encountered value in the X dimension, or Inf if no X 
values were
+  /// encountered.
+  ///
+  /// The Parquet definition allows for "wrap around" bounds where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in files written by other readers.
+  double get_xmin() const;
+
+  /// \brief The maximum encountered value in the X dimension, or -Inf if no X 
values were
+  /// encountered, subject to "wrap around" bounds (see GetXMin()).
+  double get_xmax() const;
+
+  /// \brief The minimum encountered value in the Y dimension, or Inf if no Y 
values were
+  /// encountered.
+  ///
+  /// The Parquet definition allows for "wrap around" bounds where ymin > 
ymax. In this

Review Comment:
   Do we support wraparound for Y? Isn't Y is enforced to be latitude?



##########
cpp/src/parquet/metadata.cc:
##########
@@ -309,6 +321,14 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
                                                  descr_->sort_order());
   }
 
+  inline bool is_geo_stats_set() const {
+    if (possible_geo_stats_ == nullptr &&
+        column_metadata_->__isset.geospatial_statistics) {
+      possible_geo_stats_ = MakeColumnGeometryStats(*column_metadata_, descr_);

Review Comment:
   I would prefer calling this proactively in the constructor and do not 
initialize it lazily. This is not thread-safe which makes it harder to cache 
and share the FileMetaData.



##########
cpp/src/parquet/CMakeLists.txt:
##########
@@ -259,6 +262,12 @@ endif()
 if(NOT PARQUET_MINIMAL_DEPENDENCY)
   list(APPEND PARQUET_SHARED_LINK_LIBS arrow_shared)
 
+  # TODO(paleolimbot): Make sure this is OK or remove if not!
+  if(ARROW_JSON)

Review Comment:
   Is it better to add a new option like `PARQUET_SUPPORT_GEOARROW` similar to 
`PARQUET_REQUIRE_ENCRYPTION` to make it explicit?



##########
cpp/src/parquet/properties.h:
##########
@@ -69,6 +69,45 @@ constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 
1000;
 // PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
 constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
 
+/// \brief Convert Coordinate Reference System values for GEOMETRY and 
GEOGRAPHY logical
+/// types
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoCrsContext {
+ public:
+  virtual ~GeoCrsContext() = default;
+
+  /// \brief Remove any previously saved PROJJSON CRS fields
+  virtual void Clear() = 0;
+
+  /// \brief Given a coordinate reference system value and encoding from 
GeoArrow
+  /// extension metadata, return the value that should be placed in the
+  /// LogicalType::Geography|Geometry(crs=) field
+  ///
+  /// For PROJJSON Crses (the most common way coordinate reference systems 
arrive
+  /// in Arrow), the Parquet specification forces us to write them to the file
+  /// metadata. The default GeoCrsContext will record such values and return 
the required
+  /// string that can be placed into the 'crs' of the Geometry or Geography 
logical
+  /// type.
+  virtual std::string GetParquetCrs(std::string crs_value,

Review Comment:
   nit: add documentation for accepted values of `crs_encoding`?



##########
cpp/src/parquet/geospatial_util_internal.cc:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_util_internal.h"
+
+#include "arrow/result.h"
+#include "arrow/util/endian.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/ubsan.h"
+
+namespace parquet::geometry {
+
+/// \brief Object to keep track of the low-level consumption of a well-known 
binary
+/// geometry
+///
+/// Briefly, ISO well-known binary supported by the Parquet spec is an endian 
byte
+/// (0x01 or 0x00), followed by geometry type + dimensions encoded as a 
(uint32_t),
+/// followed by geometry-specific data. Coordinate sequences are represented 
by a
+/// uint32_t (the number of coordinates) plus a sequence of doubles (number of 
coordinates
+/// multiplied by the number of dimensions).
+class WKBBuffer {
+ public:
+  WKBBuffer() : data_(NULLPTR), size_(0) {}
+  WKBBuffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
+
+  void Init(const uint8_t* data, int64_t size) {
+    data_ = data;
+    size_ = size;
+  }
+
+  ::arrow::Result<uint8_t> ReadUInt8() { return ReadChecked<uint8_t>(); }
+
+  ::arrow::Result<uint32_t> ReadUInt32(bool swap) {
+    ARROW_ASSIGN_OR_RAISE(auto value, ReadChecked<uint32_t>());
+    if (ARROW_PREDICT_FALSE(swap)) {
+      return ByteSwap(value);
+    } else {
+      return value;
+    }
+  }
+
+  template <typename Coord, typename Visit>
+  ::arrow::Status ReadDoubles(uint32_t n_coords, bool swap, Visit&& visit) {
+    size_t total_bytes = n_coords * sizeof(Coord);
+    if (size_ < total_bytes) {
+      return ::arrow::Status::SerializationError(
+          "Can't coordinate sequence of ", total_bytes, " bytes from WKBBuffer 
with ",
+          size_, " remaining");
+    }
+
+    if (ARROW_PREDICT_FALSE(swap)) {
+      Coord coord;
+      for (uint32_t i = 0; i < n_coords; i++) {
+        coord = ReadUnchecked<Coord>();
+        for (uint32_t j = 0; j < coord.size(); j++) {
+          coord[j] = ByteSwap(coord[j]);
+        }
+
+        visit(coord);
+      }
+    } else {
+      for (uint32_t i = 0; i < n_coords; i++) {
+        visit(ReadUnchecked<Coord>());
+      }
+    }
+
+    return ::arrow::Status::OK();
+  }
+
+  size_t size() { return size_; }
+
+ private:
+  const uint8_t* data_;
+  size_t size_;
+
+  template <typename T>
+  ::arrow::Result<T> ReadChecked() {
+    if (size_ < sizeof(T)) {
+      return ::arrow::Status::SerializationError(
+          "Can't read ", sizeof(T), " bytes from WKBBuffer with ", size_, " 
remaining");
+    }
+
+    return ReadUnchecked<T>();
+  }
+
+  template <typename T>
+  T ReadUnchecked() {
+    T out = ::arrow::util::SafeLoadAs<T>(data_);
+    data_ += sizeof(T);
+    size_ -= sizeof(T);
+    return out;
+  }
+
+  template <typename T>
+  T ByteSwap(T value) {
+    return ::arrow::bit_util::ByteSwap(value);
+  }
+};
+
+using GeometryTypeAndDimensions = std::pair<GeometryType, Dimensions>;
+
+namespace {
+
+::arrow::Result<GeometryTypeAndDimensions> ParseGeometryType(uint32_t 
wkb_geometry_type) {
+  // The number 1000 can be used because WKB geometry types are constructed
+  // on purpose such that this relationship is true (e.g., LINESTRING ZM maps
+  // to 3002).
+  uint32_t geometry_type_component = wkb_geometry_type % 1000;
+  uint32_t dimensions_component = wkb_geometry_type / 1000;
+
+  auto min_geometry_type_value = 
static_cast<uint32_t>(GeometryType::kWKBValueMin);
+  auto max_geometry_type_value = 
static_cast<uint32_t>(GeometryType::kWKBValueMax);
+  auto min_dimension_value = static_cast<uint32_t>(Dimensions::kWKBValueMin);
+  auto max_dimension_value = static_cast<uint32_t>(Dimensions::kWKBValueMax);
+
+  if (geometry_type_component < min_geometry_type_value ||
+      geometry_type_component > max_geometry_type_value ||
+      dimensions_component < min_dimension_value ||
+      dimensions_component > max_dimension_value) {
+    return ::arrow::Status::SerializationError("Invalid WKB geometry type: ",
+                                               wkb_geometry_type);
+  }
+
+  GeometryTypeAndDimensions 
out{static_cast<GeometryType>(geometry_type_component),
+                                static_cast<Dimensions>(dimensions_component)};
+  return out;
+}
+
+}  // namespace
+
+std::vector<int32_t> WKBGeometryBounder::GeometryTypes() const {
+  std::vector<int32_t> out(geospatial_types_.begin(), geospatial_types_.end());
+  std::sort(out.begin(), out.end());
+  return out;
+}
+
+::arrow::Status WKBGeometryBounder::ReadGeometry(std::string_view bytes_wkb) {
+  WKBBuffer src{reinterpret_cast<const uint8_t*>(bytes_wkb.data()),
+                static_cast<int64_t>(bytes_wkb.size())};
+  ARROW_RETURN_NOT_OK(ReadGeometryInternal(&src, /*record_wkb_type=*/true));
+  if (src.size() != 0) {
+    return ::arrow::Status::SerializationError(
+        "Exepcted zero bytes after consuming WKB but got ", src.size());
+  }
+
+  return ::arrow::Status::OK();
+}
+
+::arrow::Status WKBGeometryBounder::ReadGeometryInternal(WKBBuffer* src,
+                                                         bool record_wkb_type) 
{
+  ARROW_ASSIGN_OR_RAISE(uint8_t endian, src->ReadUInt8());
+#if defined(ARROW_LITTLE_ENDIAN)
+  bool swap = endian != 0x01;
+#else
+  bool swap = endian != 0x00;
+#endif
+
+  ARROW_ASSIGN_OR_RAISE(uint32_t wkb_geometry_type, src->ReadUInt32(swap));
+  ARROW_ASSIGN_OR_RAISE(auto geometry_type_and_dimensions,

Review Comment:
   Can we use structured binding to make it more explicit?



##########
cpp/src/parquet/geospatial_util_internal_json.h:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/util/key_value_metadata.h"
+
+#include "parquet/properties.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Compute a Parquet Logical type (Geometry(...) or Geography(...)) 
from
+/// serialized GeoArrow metadata
+///
+/// Returns the appropriate LogicalType, SerializationError if the metadata 
was invalid,
+/// or NotImplemented if Parquet was not built with ARROW_JSON.
+::arrow::Result<std::shared_ptr<const LogicalType>> 
GeospatialLogicalTypeFromGeoArrowJSON(
+    const std::string& serialized_data, const ArrowWriterProperties& 
arrow_properties);
+
+/// \brief Compute a suitable DataType into which a GEOMETRY or GEOGRAPHY type 
should be
+/// read
+///
+/// The result of this function depends on whether or not "geoarrow.wkb" has 
been
+/// registered: if it has, the result will be the registered ExtensionType; if 
it has not,
+/// the result will be binary().
+::arrow::Result<std::shared_ptr<::arrow::DataType>> MakeGeoArrowGeometryType(
+    const LogicalType& logical_type,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata);
+
+/// \brief The default GeoCrsContext, which writes any PROJJSON coordinate 
reference
+/// system values it encounters to the file metadata
+class PARQUET_EXPORT FileGeoCrsContext : public GeoCrsContext {

Review Comment:
   Will we add encoding other than PROJJSON? I'm thinking about the naming 
which depends on whether it is PROJJSON-specific or GeoArrow-specific.



##########
cpp/src/parquet/arrow/schema.cc:
##########
@@ -428,13 +447,19 @@ Status FieldToNode(const std::string& name, const 
std::shared_ptr<Field>& field,
     }
     case ArrowTypeId::EXTENSION: {
       auto ext_type = 
std::static_pointer_cast<::arrow::ExtensionType>(field->type());
-      // Built-in JSON extension is handled differently.
+      // Built-in JSON extension and GeoArrow are handled differently.

Review Comment:
   I was thinking if we can add an extension type registry which allows users 
to register an extension name and the function to create the extension type 
and/or parquet type. This is similar to what I've added to avro recently: 
https://github.com/apache/avro/pull/3326/files#diff-00521bd02440832a207504b570a0fc02ccbe0928c6710e419503d48284aeee90R95-R114
   



##########
cpp/src/parquet/arrow/schema.cc:
##########
@@ -243,6 +245,23 @@ static Status GetTimestampMetadata(const 
::arrow::TimestampType& type,
   return Status::OK();
 }
 
+Result<std::shared_ptr<const LogicalType>> GeospatialLogicalTypeFromArrow(
+    const std::string& serialized_data, const ArrowWriterProperties& 
arrow_properties) {
+  // Without a JSON parser, we can still handle a few trivial cases

Review Comment:
   Are these the most common (or default) cases?



##########
cpp/src/parquet/geospatial_statistics.cc:
##########
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial_statistics.h"
+#include <cmath>
+#include <memory>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/logging.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial_util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    if (is_valid_ != other.is_valid_) {
+      return false;
+    }
+
+    if (!is_valid_ && !other.is_valid_) {
+      return true;
+    }
+
+    if (bounder_.GeometryTypes() != other.bounder_.GeometryTypes()) {
+      return false;
+    }
+
+    if (bounder_.Bounds() != other.bounder_.Bounds()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || is_wraparound_y() || other.is_wraparound_x() ||
+        other.is_wraparound_y()) {
+      throw ParquetException(
+          "Wraparound X or Y is not suppored by GeoStatistics::Merge()");

Review Comment:
   BTW, do we want to fail the writer or silently disable bbox?



##########
cpp/src/parquet/arrow/schema.cc:
##########
@@ -243,6 +245,23 @@ static Status GetTimestampMetadata(const 
::arrow::TimestampType& type,
   return Status::OK();
 }
 
+Result<std::shared_ptr<const LogicalType>> GeospatialLogicalTypeFromArrow(
+    const std::string& serialized_data, const ArrowWriterProperties& 
arrow_properties) {
+  // Without a JSON parser, we can still handle a few trivial cases

Review Comment:
   Why not putting all these logics into 
`GeospatialLogicalTypeFromGeoArrowJSON` so that they are not scattered in two 
separate places?



##########
cpp/src/parquet/properties.h:
##########
@@ -1112,6 +1152,13 @@ class PARQUET_EXPORT ArrowWriterProperties {
       return this;
     }
 
+    /// \brief Set the GeoCrsContext with which coordinate reference systems 
should be
+    /// interpreted for GEOMETRY and GEOGRAPHY types
+    Builder* set_geospatial_crs_context(std::shared_ptr<GeoCrsContext> 
geo_crs_context) {
+      geo_crs_context_ = geo_crs_context;

Review Comment:
   Should we make it thread-safe? There are cases where a single 
ReaderProperties is shared by multiple readers (e.g. in the dataset scanner) 
and have concurrent access to it.



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -546,6 +554,10 @@ Status GetSchemaMetadata(const ::arrow::Schema& schema, 
::arrow::MemoryPool* poo
   std::string schema_as_string = serialized->ToString();
   std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string);
   result->Append(kArrowSchemaKey, std::move(schema_base64));
+
+  // Add any required fields collected by the GeoCrsContext
+  
properties.geo_crs_context()->AddProjjsonCrsFieldsToFileMetadata(result.get());

Review Comment:
   It is weird to mutate the member variable of a const reference. We can 
either add GeoCrsContext as a separate parameter or change 
ArrowWriterProperties to be non-const.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to