pitrou commented on code in PR #45459:
URL: https://github.com/apache/arrow/pull/45459#discussion_r2058332513


##########
cpp/src/parquet/column_writer.cc:
##########
@@ -1826,6 +1870,12 @@ Status 
TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
     page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
     page_statistics_->IncrementNumValues(non_null_count);
     page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
+
+    if (chunk_geospatial_statistics_ != nullptr) {
+      throw ParquetException(
+          "Writing dictionary-encoded GEOMETRY or GEOGRAPHY with statistics is 
not "
+          "supported");
+    }

Review Comment:
   Is this something we plan to later support?



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+struct PARQUET_EXPORT EncodedGeoStatistics {
+  bool xy_bounds_present{};
+  double xmin{kNaN};
+  double xmax{kNaN};
+  double ymin{kNaN};
+  double ymax{kNaN};
+
+  bool z_bounds_present{};
+  double zmin{kNaN};
+  double zmax{kNaN};
+
+  bool m_bounds_present{};
+  double mmin{kNaN};
+  double mmax{kNaN};
+
+  bool writer_calculated_geospatial_types() const { return 
!geospatial_types.empty(); }
+  std::vector<int32_t> geospatial_types;
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// Note that NaN values that were encountered within coordinates are omitted; 
however,
+/// NaN values that were obtained via decoding encoded statistics are 
propagated. This
+/// behaviour ensures C++ clients that are inspecting statistics via the 
column metadata
+/// can detect the case where a writer generated NaNs (even though this 
implementation
+/// does not generate them).
+///
+/// The handling of NaN values in coordinates is not well-defined among 
bounding
+/// implementations except for the WKB convention for POINT EMPTY, which is 
consistently
+/// represented as a point whose ordinates are all NaN. Any other geometry 
that contains
+/// NaNs cannot expect defined behaviour here or elsewhere; however, a row 
group that
+/// contains both NaN-containing and normal (completely finite) geometries 
should not be
+/// excluded from predicate pushdown.
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered or if the statistics contain NaN
+  /// for any reason, Encode() will return nullopt to indicate that
+  /// statistics should not be written to thrift.
+  std::optional<EncodedGeoStatistics> Encode() const;
+
+  /// \brief Returns false if invalid WKB was encountered
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief All minimum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be +Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> lower_bound() const;
+
+  /// \brief All maximum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be -Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> upper_bound() const;
+
+  /// \brief Returns true if the dimension is valid and any non-NaN values were
+  /// encountered in the given dimension in XYZM order
+  std::array<bool, kMaxDimensions> dimension_empty() const;

Review Comment:
   Is the docstring correct? How does a method named "dimension_empty" return 
"true if the dimension is valid and any non-NaN values were encountered"?



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+struct PARQUET_EXPORT EncodedGeoStatistics {
+  bool xy_bounds_present{};
+  double xmin{kNaN};
+  double xmax{kNaN};
+  double ymin{kNaN};
+  double ymax{kNaN};
+
+  bool z_bounds_present{};
+  double zmin{kNaN};
+  double zmax{kNaN};
+
+  bool m_bounds_present{};
+  double mmin{kNaN};
+  double mmax{kNaN};
+
+  bool writer_calculated_geospatial_types() const { return 
!geospatial_types.empty(); }
+  std::vector<int32_t> geospatial_types;
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// Note that NaN values that were encountered within coordinates are omitted; 
however,
+/// NaN values that were obtained via decoding encoded statistics are 
propagated. This
+/// behaviour ensures C++ clients that are inspecting statistics via the 
column metadata
+/// can detect the case where a writer generated NaNs (even though this 
implementation
+/// does not generate them).
+///
+/// The handling of NaN values in coordinates is not well-defined among 
bounding
+/// implementations except for the WKB convention for POINT EMPTY, which is 
consistently
+/// represented as a point whose ordinates are all NaN. Any other geometry 
that contains
+/// NaNs cannot expect defined behaviour here or elsewhere; however, a row 
group that
+/// contains both NaN-containing and normal (completely finite) geometries 
should not be
+/// excluded from predicate pushdown.
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered or if the statistics contain NaN
+  /// for any reason, Encode() will return nullopt to indicate that
+  /// statistics should not be written to thrift.
+  std::optional<EncodedGeoStatistics> Encode() const;
+
+  /// \brief Returns false if invalid WKB was encountered
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief All minimum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be +Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> lower_bound() const;
+
+  /// \brief All maximum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be -Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> upper_bound() const;
+
+  /// \brief Returns true if the dimension is valid and any non-NaN values were
+  /// encountered in the given dimension in XYZM order
+  std::array<bool, kMaxDimensions> dimension_empty() const;
+
+  /// \brief Returns false if a bound was explicitly not calculated by a 
writer when
+  /// importing statistics from thrift

Review Comment:
   I don't think I understand what this means. Why would a writer import 
statistics from thrift?



##########
cpp/src/parquet/geospatial/util_internal.h:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <cmath>
+#include <limits>
+#include <string>
+#include <unordered_set>
+
+#include "arrow/util/logging_internal.h"
+#include "parquet/platform.h"
+
+namespace parquet::geospatial {
+
+/// \brief Infinity, used to define bounds of empty bounding boxes
+constexpr double kInf = std::numeric_limits<double>::infinity();
+
+/// \brief Valid combinations of dimensions allowed by ISO well-known binary
+///
+/// These values correspond to the 0, 1000, 2000, 3000 component of the WKB 
integer
+/// geometry type (i.e., the value of geometry_type // 1000).
+enum class Dimensions {
+  kXY = 0,
+  kXYZ = 1,
+  kXYM = 2,
+  kXYZM = 3,
+  kValueMin = 0,
+  kValueMax = 3
+};
+
+/// \brief The supported set of geometry types allowed by ISO well-known binary
+///
+/// These values correspond to the 1, 2, ..., 7 component of the WKB integer
+/// geometry type (i.e., the value of geometry_type % 1000).
+enum class GeometryType {
+  kPoint = 1,
+  kLinestring = 2,
+  kPolygon = 3,
+  kMultiPoint = 4,
+  kMultiLinestring = 5,
+  kMultiPolygon = 6,
+  kGeometryCollection = 7,
+  kValueMin = 1,
+  kValueMax = 7
+};
+
+/// \brief A collection of intervals representing the encountered ranges of 
values
+/// in each dimension.
+///
+/// The Parquet specification also supports wraparound bounding boxes in the X
+/// dimension; however, this structure assumes min < max always as it is used 
for
+/// the purposes of accumulating this type of bounds.
+///
+/// This class will ignore any NaN values it visits via UpdateXY[Z[M]](). This 
is
+/// consistent with GEOS and ensures that ranges are accumulated in the same 
way that
+/// other statistics in Parquet are accumulated (e.g., statistics of a double 
column will
+/// return a min/max of all non-NaN values). In WKB specifically, POINT EMPTY 
is
+/// represented by convention as all ordinate values filled with NaN, so this 
behaviour
+/// allows for no special-casing of POINT EMPTY in the WKB reader.
+///
+/// This class will propagate any NaN values (per dimension) that were 
explicitly
+/// specified via setting mins/maxes directly or by merging another 
BoundingBox that
+/// contained NaN values. This definition ensures that NaN bounds obtained via
+/// EncodedGeoStatistics (which may have been written by some other writer 
that generated
+/// NaNs, either on purpose or by accident) are not silently overwritten.
+struct PARQUET_EXPORT BoundingBox {
+  using XY = std::array<double, 2>;
+  using XYZ = std::array<double, 3>;
+  using XYM = std::array<double, 3>;
+  using XYZM = std::array<double, 4>;
+
+  BoundingBox(const XYZM& mins, const XYZM& maxes) : min(mins), max(maxes) {}
+  BoundingBox() : min{kInf, kInf, kInf, kInf}, max{-kInf, -kInf, -kInf, -kInf} 
{}
+
+  BoundingBox(const BoundingBox& other) = default;
+  BoundingBox& operator=(const BoundingBox&) = default;
+
+  /// \brief Update the X and Y bounds to ensure these bounds contain coord
+  void UpdateXY(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 2);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Update the X, Y, and Z bounds to ensure these bounds contain coord
+  void UpdateXYZ(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 3);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Update the X, Y, and M bounds to ensure these bounds contain coord
+  void UpdateXYM(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 3);
+    min[0] = std::min(min[0], coord[0]);
+    min[1] = std::min(min[1], coord[1]);
+    min[3] = std::min(min[3], coord[2]);
+    max[0] = std::max(max[0], coord[0]);
+    max[1] = std::max(max[1], coord[1]);
+    max[3] = std::max(max[3], coord[2]);
+  }
+
+  /// \brief Update the X, Y, Z, and M bounds to ensure these bounds contain 
coord
+  void UpdateXYZM(::arrow::util::span<const double> coord) {
+    DCHECK_EQ(coord.size(), 4);
+    UpdateInternal(coord);
+  }
+
+  /// \brief Reset these bounds to an empty state such that they contain no 
coordinates
+  void Reset() {
+    for (int i = 0; i < 4; i++) {
+      min[i] = kInf;
+      max[i] = -kInf;
+    }
+  }
+
+  /// \brief Update these bounds such they also contain other
+  void Merge(const BoundingBox& other) {
+    for (int i = 0; i < 4; i++) {
+      if (std::isnan(min[i]) || std::isnan(max[i]) || std::isnan(other.min[i]) 
||
+          std::isnan(other.max[i])) {
+        min[i] = std::numeric_limits<double>::quiet_NaN();
+        max[i] = std::numeric_limits<double>::quiet_NaN();
+      } else {
+        min[i] = std::min(min[i], other.min[i]);
+        max[i] = std::max(max[i], other.max[i]);
+      }
+    }
+  }
+
+  std::string ToString() const;
+
+  XYZM min;
+  XYZM max;
+
+ private:
+  // This works for XY, XYZ, and XYZM
+  template <typename Coord>
+  void UpdateInternal(Coord coord) {
+    for (size_t i = 0; i < coord.size(); i++) {
+      min[i] = std::min(min[i], coord[i]);
+      max[i] = std::max(max[i], coord[i]);
+    }
+  }
+};
+
+inline bool operator==(const BoundingBox& lhs, const BoundingBox& rhs) {
+  return lhs.min == rhs.min && lhs.max == rhs.max;
+}
+
+inline bool operator!=(const BoundingBox& lhs, const BoundingBox& rhs) {
+  return !(lhs == rhs);
+}
+
+class WKBBuffer;
+
+/// \brief Accumulate a BoundingBox and geometry types based on zero or more 
well-known
+/// binary blobs
+///
+/// Note that this class is NOT appropriate for bounding a GEOGRAPHY,
+/// whose bounds are not a function purely of the vertices. Geography bounding
+/// is not yet implemented.
+class PARQUET_EXPORT WKBGeometryBounder {
+ public:
+  /// \brief Accumulate the bounds of a serialized well-known binary geometry
+  ///
+  /// Throws ParquetException for any parse errors encountered. Bounds for
+  /// any encountered coordinates are accumulated and the geometry type of
+  /// the geometry is added to the internal geometry type list.
+  void MergeGeometry(std::string_view bytes_wkb);

Review Comment:
   Can also expose an overload taking a `util:span<const uint8_t>` for 
convenience.



##########
cpp/src/parquet/geospatial/util_json_internal.cc:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_json_internal.h"
+
+#include <string>
+
+#include "arrow/extension_type.h"
+#include "arrow/json/rapidjson_defs.h"  // IWYU pragma: keep
+#include "arrow/result.h"
+#include "arrow/util/string.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+::arrow::Result<std::string> GeospatialGeoArrowCrsToParquetCrs(
+    const ::arrow::rapidjson::Document& document) {
+  namespace rj = ::arrow::rapidjson;
+
+  if (!document.HasMember("crs") || document["crs"].IsNull()) {
+    // Parquet GEOMETRY/GEOGRAPHY do not have a concept of a null/missing
+    // CRS, but an omitted one is more likely to have meant "lon/lat" than
+    // a truly unspecified one (i.e., Engineering CRS with arbitrary XY units)
+    return "";
+  }
+
+  const auto& json_crs = document["crs"];
+  if (json_crs.IsString() && (json_crs == "EPSG:4326" || json_crs == 
"OGC:CRS84")) {
+    // crs can be left empty because these cases both correspond to
+    // longitude/latitude in WGS84 according to the Parquet specification
+    return "";
+  } else if (json_crs.IsObject()) {
+    // Attempt to detect common PROJJSON representations of longitude/latitude 
and return
+    // an empty crs to maximize compatibility with readers that do not 
implement CRS
+    // support. PROJJSON stores this in the "id" member like:
+    // {..., "id": {"authority": "...", "code": "..."}}
+    if (json_crs.HasMember("id")) {
+      const auto& identifier = json_crs["id"];
+      if (identifier.HasMember("authority") && identifier.HasMember("code")) {
+        if (identifier["authority"] == "OGC" && identifier["code"] == "CRS84") 
{
+          return "";
+        } else if (identifier["authority"] == "EPSG" && identifier["code"] == 
4326) {
+          return "";
+        }
+      }
+    }
+  }
+
+  // If we could not detect a longitude/latitude CRS, just write the string to 
the
+  // LogicalType crs (being sure to unescape a JSON string into a regular 
string)
+  if (json_crs.IsString()) {
+    return json_crs.GetString();
+  } else {
+    rj::StringBuffer buffer;
+    rj::Writer<rj::StringBuffer> writer(buffer);
+    json_crs.Accept(writer);
+    return buffer.GetString();
+  }
+}
+
+::arrow::Result<std::string> MakeGeoArrowCrsMetadata(
+    const std::string& crs,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {

Review Comment:
   Can pass a string view for flexibility and to avoid internal copies when 
calling `substr`
   ```suggestion
   ::arrow::Result<std::string> MakeGeoArrowCrsMetadata(
       std::string_view crs,
       const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
   ```



##########
python/pyarrow/_parquet.pxd:
##########
@@ -205,6 +224,18 @@ cdef extern from "parquet/api/schema.h" namespace 
"parquet" nogil:
         ParquetCipher algorithm
         AadMetadata aad
 
+
+# Specific array<> types needed for GeoStatistics
+cdef extern from "<array>" namespace "std" nogil:

Review Comment:
   You don't have to do it obviously, but for the record it would be relatively 
easy to upstream `std::array` declarations in Cython, here is an example 
related PR: https://github.com/cython/cython/pull/6652



##########
cpp/src/parquet/api/reader.h:
##########
@@ -22,6 +22,7 @@
 #include "parquet/column_scanner.h"
 #include "parquet/exception.h"
 #include "parquet/file_reader.h"
+#include "parquet/geospatial/statistics.h"

Review Comment:
   I'm not sure we want to further expand the `api.h` header. Every additional 
inclusion here can increase compile times for users, we should encourage them 
to include the individual headers as-needed instead.



##########
cpp/src/parquet/reader_test.cc:
##########
@@ -1857,4 +1863,174 @@ TEST(PageIndexReaderTest, ReadFileWithoutPageIndex) {
   ASSERT_EQ(nullptr, row_group_index_reader);
 }
 
+class TestGeometryLogicalType : public ::testing::Test {
+ public:
+  const int kNumRows = 1000;
+
+  void WriteTestData(std::shared_ptr<const LogicalType> type, bool 
write_arrow) {
+    // Make schema
+    schema::NodeVector fields;
+    fields.push_back(
+        PrimitiveNode::Make("g", Repetition::REQUIRED, type, 
Type::BYTE_ARRAY));
+    auto schema = std::static_pointer_cast<GroupNode>(
+        GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+    // Write small batches and small data pages
+    auto writer_props_builder = WriterProperties::Builder();
+    writer_props_builder.write_batch_size(64);
+
+    std::shared_ptr<WriterProperties> writer_props = 
writer_props_builder.build();
+
+    ASSERT_OK_AND_ASSIGN(auto out_file, 
::arrow::io::BufferOutputStream::Create());
+    std::shared_ptr<ParquetFileWriter> file_writer =
+        ParquetFileWriter::Open(out_file, schema, writer_props);
+    RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+    // write WKB points to columns
+    auto* writer =
+        
::arrow::internal::checked_cast<ByteArrayWriter*>(rg_writer->NextColumn());
+    if (!write_arrow) {
+      WriteTestDataUsingWriteBatch(writer);
+    } else {
+      WriteTestDataUsingWriteArrow(writer);
+    }
+
+    rg_writer->Close();
+    file_writer->Close();
+
+    ASSERT_OK_AND_ASSIGN(file_buf, out_file->Finish());
+  }
+
+  void WriteTestDataUsingWriteBatch(ByteArrayWriter* writer) {
+    std::vector<uint8_t> buffer(test::kWkbPointXYSize * kNumRows);
+    uint8_t* ptr = buffer.data();
+    std::vector<ByteArray> values(kNumRows);
+    for (int k = 0; k < kNumRows; k++) {
+      std::string item = test::MakeWKBPoint(
+          {static_cast<double>(k), static_cast<double>(k + 1)}, false, false);
+      std::memcpy(ptr, item.data(), item.size());
+      values[k].len = test::kWkbPointXYSize;
+      values[k].ptr = ptr;
+      ptr += test::kWkbPointXYSize;
+    }
+    writer->WriteBatch(kNumRows, nullptr, nullptr, values.data());
+  }
+
+  void WriteTestDataUsingWriteArrow(ByteArrayWriter* writer) {
+    ::arrow::BinaryBuilder builder;
+    for (int k = 0; k < kNumRows; k++) {
+      std::string item = test::MakeWKBPoint(
+          {static_cast<double>(k), static_cast<double>(k + 1)}, false, false);
+
+      ASSERT_OK(builder.Append(item));
+    }
+    std::shared_ptr<::arrow::BinaryArray> array;
+    ASSERT_OK(builder.Finish(&array));
+
+    std::shared_ptr<ArrowWriterProperties> properties =
+        ArrowWriterProperties::Builder().build();
+    MemoryPool* pool = ::arrow::default_memory_pool();
+    auto ctx = std::make_unique<ArrowWriteContext>(pool, properties.get());
+    ASSERT_OK(writer->WriteArrow(nullptr, nullptr, kNumRows, *array, ctx.get(),
+                                 /*leaf_field_nullable=*/true));
+  }
+
+  void TestWriteAndRead(std::shared_ptr<const LogicalType> type, bool 
write_arrow) {
+    ASSERT_NO_FATAL_FAILURE(WriteTestData(type, write_arrow));
+
+    auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);
+
+    ReaderProperties reader_props;
+    reader_props.enable_buffered_stream();
+    reader_props.set_buffer_size(64);
+    auto file_reader = ParquetFileReader::Open(in_file, reader_props);
+
+    // Check that the geometry statistics are correctly written and read
+    auto metadata = file_reader->metadata();
+    ASSERT_TRUE(type->Equals(*metadata->schema()->Column(0)->logical_type()));
+
+    auto page_index_reader = file_reader->GetPageIndexReader();
+    int num_row_groups = metadata->num_row_groups();
+    int64_t start_index = 0;
+    for (int i = 0; i < num_row_groups; i++) {
+      auto row_group_metadata = metadata->RowGroup(i);
+      auto column_chunk_metadata = row_group_metadata->ColumnChunk(0);
+      auto geo_stats = column_chunk_metadata->geo_statistics();
+      ASSERT_NO_FATAL_FAILURE(CheckGeoStatistics(type, geo_stats, start_index,
+                                                 
row_group_metadata->num_rows()));
+      start_index += row_group_metadata->num_rows();
+    }
+
+    // Check the geometry values
+    int64_t total_values_read = 0;
+    for (int i = 0; i < num_row_groups; i++) {
+      auto row_group = file_reader->RowGroup(i);
+      std::shared_ptr<ByteArrayReader> reader =
+          std::static_pointer_cast<ByteArrayReader>(row_group->Column(0));
+      while (reader->HasNext()) {
+        std::vector<ByteArray> out(kNumRows);
+        int64_t values_read = 0;
+        int64_t levels_read =
+            reader->ReadBatch(kNumRows, nullptr, nullptr, out.data(), 
&values_read);
+        ASSERT_GE(levels_read, 1);
+        ASSERT_GE(values_read, 1);
+
+        // Check the batch
+        for (int64_t i = 0; i < values_read; i++) {
+          const ByteArray& value = out[i];
+          auto xy = test::GetWKBPointCoordinateXY(value);
+          EXPECT_TRUE(xy.has_value());
+          auto expected_x = static_cast<double>(i + total_values_read);
+          auto expected_y = static_cast<double>(i + 1 + total_values_read);
+          EXPECT_EQ(*xy, (std::pair<double, double>(expected_x, expected_y)));
+        }
+
+        total_values_read += values_read;
+      }
+    }
+    EXPECT_EQ(kNumRows, total_values_read);
+  }
+
+  void CheckGeoStatistics(std::shared_ptr<const LogicalType> type,
+                          std::shared_ptr<geospatial::GeoStatistics> 
geom_stats,
+                          int64_t start_index, int64_t num_rows) {
+    // We don't yet generate statistics for Geography
+    if (type->is_geography()) {
+      ASSERT_EQ(geom_stats, nullptr);
+      return;
+    }
+
+    ASSERT_NE(geom_stats, nullptr);
+    // We wrote exactly one geometry type (POINT, which has code 1)
+    EXPECT_THAT(*geom_stats->geometry_types(), ::testing::ElementsAre(1));
+
+    double expected_xmin = static_cast<double>(start_index);
+    double expected_xmax = expected_xmin + num_rows - 1;
+    double expected_ymin = expected_xmin + 1;
+    double expected_ymax = expected_xmax + 1;
+
+    EXPECT_EQ(geom_stats->lower_bound()[0], expected_xmin);
+    EXPECT_EQ(geom_stats->upper_bound()[0], expected_xmax);
+    EXPECT_EQ(geom_stats->lower_bound()[1], expected_ymin);
+    EXPECT_EQ(geom_stats->upper_bound()[1], expected_ymax);
+    EXPECT_THAT(geom_stats->dimension_valid(),
+                ::testing::ElementsAre(true, true, false, false));
+  }
+
+ protected:
+  std::shared_ptr<Buffer> file_buf;
+};
+
+TEST_F(TestGeometryLogicalType, TestWriteGeometry) {
+  TestWriteAndRead(GeometryLogicalType::Make("srid:1234"), 
/*write_arrow=*/false);
+}
+
+TEST_F(TestGeometryLogicalType, TestWriteArrowAndReadGeometry) {
+  TestWriteAndRead(GeometryLogicalType::Make("srid:1234"), 
/*write_arrow=*/true);
+}
+
+TEST_F(TestGeometryLogicalType, TestWriteGeography) {
+  TestWriteAndRead(GeographyLogicalType::Make("srid:1234"), 
/*write_arrow=*/false);

Review Comment:
   Why not also test with `write_arrow=true`?



##########
cpp/src/parquet/geospatial/statistics.cc:
##########
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/statistics.h"
+
+#include <cmath>
+#include <memory>
+#include <optional>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial/util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet::geospatial {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    return is_valid_ == other.is_valid_ &&
+           bounder_.GeometryTypes() == other.bounder_.GeometryTypes() &&
+           bounder_.Bounds() == other.bounder_.Bounds();
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || other.is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not supported by 
GeoStatistics::Merge()");
+    }
+
+    bounder_.MergeBox(other.bounder_.Bounds());
+    std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
+    bounder_.MergeGeometryTypes(other_geometry_types);
+  }
+
+  void Update(const ByteArray* values, int64_t num_values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    for (int64_t i = 0; i < num_values; i++) {
+      const ByteArray& item = values[i];
+      try {
+        bounder_.MergeGeometry({reinterpret_cast<const char*>(item.ptr), 
item.len});
+      } catch (ParquetException&) {
+        is_valid_ = false;
+        return;
+      }
+    }
+  }
+
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values) {
+    DCHECK_GT(num_spaced_values, 0);
+
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    ::arrow::Status status = ::arrow::internal::VisitSetBitRuns(
+        valid_bits, valid_bits_offset, num_spaced_values,
+        [&](int64_t position, int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            ByteArray item = SafeLoad(values + i + position);
+            PARQUET_CATCH_NOT_OK(bounder_.MergeGeometry(
+                {reinterpret_cast<const char*>(item.ptr), item.len}));
+          }
+
+          return ::arrow::Status::OK();
+        });
+
+    if (!status.ok()) {
+      is_valid_ = false;
+    }
+  }
+
+  void Update(const ::arrow::Array& values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    // Note that ::arrow::Type::EXTENSION seems to be handled before this is 
called
+    switch (values.type_id()) {
+      case ::arrow::Type::BINARY:
+        UpdateArrayImpl<::arrow::BinaryArray>(values);
+        break;
+      case ::arrow::Type::LARGE_BINARY:
+        UpdateArrayImpl<::arrow::LargeBinaryArray>(values);
+        break;
+      // This does not currently handle run-end encoded, dictionary encodings, 
or views
+      default:
+        throw ParquetException("Unsupported Array type in 
GeoStatistics::Update(Array): ",
+                               values.type()->ToString());
+    }
+  }
+
+  void Reset() {
+    bounder_.Reset();
+    is_valid_ = true;
+  }
+
+  EncodedGeoStatistics Encode() const {
+    if (!is_valid_) {
+      return {};
+    }
+
+    const geospatial::BoundingBox::XYZM& mins = bounder_.Bounds().min;
+    const geospatial::BoundingBox::XYZM& maxes = bounder_.Bounds().max;
+
+    EncodedGeoStatistics out;
+
+    if (has_geometry_types_) {
+      out.geospatial_types = bounder_.GeometryTypes();
+    }
+
+    bool write_x = !bound_empty(0) && bound_valid(0);
+    bool write_y = !bound_empty(1) && bound_valid(1);
+    bool write_z = write_x && write_y && !bound_empty(2) && bound_valid(2);
+    bool write_m = write_x && write_y && !bound_empty(3) && bound_valid(3);
+
+    if (write_x && write_y) {
+      out.xmin = mins[0];
+      out.xmax = maxes[0];
+      out.ymin = mins[1];
+      out.ymax = maxes[1];
+      out.xy_bounds_present = true;
+    }
+
+    if (write_z) {
+      out.zmin = mins[2];
+      out.zmax = maxes[2];
+      out.z_bounds_present = true;
+    }
+
+    if (write_m) {
+      out.mmin = mins[3];
+      out.mmax = maxes[3];
+      out.m_bounds_present = true;
+    }
+
+    return out;
+  }
+
+  void Update(const EncodedGeoStatistics& encoded) {
+    if (!is_valid_) {
+      return;
+    }
+
+    // We can create GeoStatistics from a wraparound bounding box, but we can't
+    // update an existing one because the merge logic is not yet implemented.
+    if (!bounds_empty() &&
+        (is_wraparound_x() || is_wraparound(encoded.xmin, encoded.xmax))) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    geospatial::BoundingBox box;
+
+    if (encoded.xy_bounds_present) {
+      box.min[0] = encoded.xmin;
+      box.max[0] = encoded.xmax;
+      box.min[1] = encoded.ymin;
+      box.max[1] = encoded.ymax;
+    } else {
+      box.min[0] = kNaN;
+      box.max[0] = kNaN;
+      box.min[1] = kNaN;
+      box.max[1] = kNaN;
+    }
+
+    if (encoded.z_bounds_present) {
+      box.min[2] = encoded.zmin;
+      box.max[2] = encoded.zmax;
+    } else {
+      box.min[2] = kNaN;
+      box.max[2] = kNaN;
+    }
+
+    if (encoded.m_bounds_present) {
+      box.min[3] = encoded.mmin;
+      box.max[3] = encoded.mmax;
+    } else {
+      box.min[3] = kNaN;
+      box.max[3] = kNaN;
+    }
+
+    bounder_.MergeBox(box);
+    bounder_.MergeGeometryTypes(encoded.geospatial_types);
+    has_geometry_types_ = has_geometry_types_ && 
encoded.geospatial_types_present();
+  }
+
+  bool is_wraparound_x() const {
+    return is_wraparound(lower_bound()[0], upper_bound()[0]);
+  }
+
+  bool is_valid() const { return is_valid_; }
+
+  bool bounds_empty() const {
+    for (int i = 0; i < kMaxDimensions; i++) {
+      if (!bound_empty(i)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  bool bound_empty(int i) const {
+    return std::isinf(bounder_.Bounds().min[i] - bounder_.Bounds().max[i]);
+  }
+
+  bool bound_valid(int i) const {
+    return !std::isnan(bounder_.Bounds().min[i]) && 
!std::isnan(bounder_.Bounds().max[i]);
+  }
+
+  const std::array<double, kMaxDimensions>& lower_bound() const {
+    return bounder_.Bounds().min;
+  }
+
+  const std::array<double, kMaxDimensions>& upper_bound() const {
+    return bounder_.Bounds().max;
+  }
+
+  std::optional<std::vector<int32_t>> geometry_types() const {
+    if (has_geometry_types_) {
+      return bounder_.GeometryTypes();
+    } else {
+      return std::nullopt;
+    }
+  }
+
+ private:
+  geospatial::WKBGeometryBounder bounder_;
+  bool is_valid_ = true;
+  bool has_geometry_types_ = true;
+
+  template <typename ArrayType>
+  void UpdateArrayImpl(const ::arrow::Array& values) {
+    const auto& binary_array = static_cast<const ArrayType&>(values);
+    for (int64_t i = 0; i < binary_array.length(); ++i) {
+      if (!binary_array.IsNull(i)) {
+        try {
+          bounder_.MergeGeometry(binary_array.GetView(i));
+        } catch (ParquetException&) {
+          is_valid_ = false;
+          return;
+        }
+      }
+    }
+  }
+
+  static bool is_wraparound(double dmin, double dmax) {
+    return !std::isinf(dmin - dmax) && dmin > dmax;
+  }
+};
+
+GeoStatistics::GeoStatistics() : impl_(std::make_unique<GeoStatisticsImpl>()) 
{}
+
+GeoStatistics::GeoStatistics(const EncodedGeoStatistics& encoded) : 
GeoStatistics() {
+  Decode(encoded);
+}
+
+GeoStatistics::~GeoStatistics() = default;
+
+bool GeoStatistics::Equals(const GeoStatistics& other) const {
+  return impl_->Equals(*other.impl_);
+}
+
+void GeoStatistics::Merge(const GeoStatistics& other) { 
impl_->Merge(*other.impl_); }
+
+void GeoStatistics::Update(const ByteArray* values, int64_t num_values) {
+  impl_->Update(values, num_values);
+}
+
+void GeoStatistics::UpdateSpaced(const ByteArray* values, const uint8_t* 
valid_bits,
+                                 int64_t valid_bits_offset, int64_t 
num_spaced_values,
+                                 int64_t num_values) {
+  impl_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values,
+                      num_values);
+}
+
+void GeoStatistics::Update(const ::arrow::Array& values) { 
impl_->Update(values); }
+
+void GeoStatistics::Reset() { impl_->Reset(); }
+
+bool GeoStatistics::is_valid() const { return impl_->is_valid(); }
+
+std::optional<EncodedGeoStatistics> GeoStatistics::Encode() const {
+  if (is_valid()) {
+    return impl_->Encode();
+  } else {
+    return std::nullopt;
+  }
+}
+
+void GeoStatistics::Decode(const EncodedGeoStatistics& encoded) {
+  impl_->Reset();
+  impl_->Update(encoded);
+}
+
+std::array<double, 4> GeoStatistics::lower_bound() const { return 
impl_->lower_bound(); }
+
+std::array<double, 4> GeoStatistics::upper_bound() const { return 
impl_->upper_bound(); }
+
+std::array<bool, 4> GeoStatistics::dimension_empty() const {
+  return {impl_->bound_empty(0), impl_->bound_empty(1), impl_->bound_empty(2),
+          impl_->bound_empty(3)};
+}
+
+std::array<bool, 4> GeoStatistics::dimension_valid() const {
+  return {impl_->bound_valid(0), impl_->bound_valid(1), impl_->bound_valid(2),
+          impl_->bound_valid(3)};
+}
+
+std::optional<std::vector<int32_t>> GeoStatistics::geometry_types() const {
+  return impl_->geometry_types();
+}
+
+std::string GeoStatistics::ToString() const {
+  if (!is_valid()) {
+    return "GeoStatistics <invalid>\n";
+  }
+
+  std::stringstream ss;
+  ss << "GeoStatistics " << std::endl;

Review Comment:
   Multi-line string representations are not very easy to insert into logs, 
etc. 
   Perhaps instead have something like `GeoStatistics<x: ..., y: ..., m: ...>`?



##########
cpp/src/parquet/geospatial/util_internal_test.cc:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <cmath>
+#include <cstring>
+
+#include "parquet/geospatial/util_internal.h"
+#include "parquet/test_util.h"
+
+namespace parquet::geospatial {
+
+TEST(TestGeometryUtil, TestBoundingBox) {
+  BoundingBox box;
+  EXPECT_EQ(box, BoundingBox({kInf, kInf, kInf, kInf}, {-kInf, -kInf, -kInf, 
-kInf}));
+  EXPECT_EQ(box.ToString(),
+            "BoundingBox\n  x: [inf, -inf]\n  y: [inf, -inf]\n  z: [inf, 
-inf]\n  m: "
+            "[inf, -inf]\n");
+
+  BoundingBox box_xyzm({-1, -2, -3, -4}, {1, 2, 3, 4});
+  BoundingBox box_xy({-10, -20, kInf, kInf}, {10, 20, -kInf, -kInf});
+  BoundingBox box_xyz({kInf, kInf, -30, kInf}, {-kInf, -kInf, 30, -kInf});
+  BoundingBox box_xym({kInf, kInf, kInf, -40}, {-kInf, -kInf, -kInf, 40});
+
+  box_xyzm.Merge(box_xy);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -3, -4}, {10, 20, 3, 4}));
+  EXPECT_EQ(box_xyzm.ToString(),
+            "BoundingBox\n  x: [-10, 10]\n  y: [-20, 20]\n  z: [-3, 3]\n  m: "
+            "[-4, 4]\n");
+
+  box_xyzm.Merge(box_xyz);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -30, -4}, {10, 20, 30, 4}));
+
+  box_xyzm.Merge(box_xym);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -30, -40}, {10, 20, 30, 40}));
+
+  double nan_dbl = std::numeric_limits<double>::quiet_NaN();
+  BoundingBox box_nan({nan_dbl, nan_dbl, nan_dbl, nan_dbl},
+                      {nan_dbl, nan_dbl, nan_dbl, nan_dbl});
+  box_xyzm.Merge(box_nan);
+  for (int i = 0; i < 4; i++) {
+    EXPECT_TRUE(std::isnan(box_xyzm.min[i]));
+    EXPECT_TRUE(std::isnan(box_xyzm.max[i]));
+  }
+
+  box_xyzm.Reset();
+  EXPECT_EQ(box_xyzm, BoundingBox());
+}
+
+struct WKBTestCase {
+  WKBTestCase() = default;
+  WKBTestCase(GeometryType geometry_type, Dimensions dimensions,
+              const std::vector<uint8_t>& wkb, const std::vector<double>& 
box_values = {})
+      : geometry_type(geometry_type), dimensions(dimensions), wkb(wkb) {
+    std::array<double, 4> mins = {kInf, kInf, kInf, kInf};
+    std::array<double, 4> maxes{-kInf, -kInf, -kInf, -kInf};
+
+    if (dimensions == Dimensions::kXYM) {
+      mins = {box_values[0], box_values[1], kInf, box_values[2]};
+      maxes = {box_values[3], box_values[4], -kInf, box_values[5]};
+    } else {
+      size_t coord_size = box_values.size() / 2;
+      for (uint32_t i = 0; i < coord_size; i++) {
+        mins[i] = box_values[i];
+        maxes[i] = box_values[coord_size + i];
+      }
+    }
+
+    box = BoundingBox(mins, maxes);
+  }
+  WKBTestCase(const WKBTestCase& other) = default;
+
+  GeometryType geometry_type;
+  Dimensions dimensions;
+  std::vector<uint8_t> wkb;
+  BoundingBox box;
+};
+
+std::ostream& operator<<(std::ostream& os, const WKBTestCase& obj) {
+  uint32_t iso_wkb_geometry_type =
+      static_cast<int>(obj.dimensions) * 1000 + 
static_cast<int>(obj.geometry_type);
+  os << "WKBTestCase<" << iso_wkb_geometry_type << ">";
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, const BoundingBox& obj) {

Review Comment:
   Why not put this in `util_internal.h`?



##########
python/pyarrow/_parquet.pyx:
##########
@@ -319,6 +319,136 @@ cdef _box_flba(ParquetFLBA val, uint32_t len):
     return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
 
 
+cdef class GeoStatistics(_Weakrefable):
+    """Statistics for columns with geospatial data types (experimental)"""
+
+    def __init__(self):
+        raise TypeError(f"Do not call {self.__class__.__name__}'s constructor 
directly")
+
+    def __cinit__(self):
+        pass
+
+    def __repr__(self):
+        return f"""{object.__repr__(self)}
+  geospatial_types: {self.geospatial_types}
+  xmin: {self.xmin}, xmax: {self.xmax}
+  ymin: {self.ymin}, ymax: {self.ymax}
+  zmin: {self.zmin}, zmax: {self.zmax}
+  mmin: {self.mmin}, mmax: {self.mmax}"""
+
+    def to_dict(self):
+        out = {
+            "geospatial_types": self.geospatial_types,
+            "xmin": self.xmin,
+            "xmax": self.xmax,
+            "ymin": self.ymin,
+            "ymax": self.ymax,
+            "zmin": self.zmin,
+            "zmax": self.zmax,
+            "mmin": self.mmin,
+            "mmax": self.mmax
+        }
+
+        return out
+
+    @property
+    def geospatial_types(self):

Review Comment:
   Can you add docstrings for all these properties?



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();

Review Comment:
   Can make these `inline` IMHO and remove the `static`.
   ```suggestion
   /// \brief The maximum number of dimensions represented by a geospatial type
   /// (i.e., X, Y, Z, and M)
   inline constexpr int kMaxDimensions = 4;
   
   /// \brief NaN, used to represent bounds for which predicate pushdown cannnot
   /// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
   inline constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
   ```
   
   See https://stackoverflow.com/a/54467111



##########
cpp/src/parquet/geospatial/util_json_internal.cc:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_json_internal.h"
+
+#include <string>
+
+#include "arrow/extension_type.h"
+#include "arrow/json/rapidjson_defs.h"  // IWYU pragma: keep
+#include "arrow/result.h"
+#include "arrow/util/string.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+::arrow::Result<std::string> GeospatialGeoArrowCrsToParquetCrs(
+    const ::arrow::rapidjson::Document& document) {
+  namespace rj = ::arrow::rapidjson;
+
+  if (!document.HasMember("crs") || document["crs"].IsNull()) {
+    // Parquet GEOMETRY/GEOGRAPHY do not have a concept of a null/missing
+    // CRS, but an omitted one is more likely to have meant "lon/lat" than
+    // a truly unspecified one (i.e., Engineering CRS with arbitrary XY units)
+    return "";
+  }
+
+  const auto& json_crs = document["crs"];
+  if (json_crs.IsString() && (json_crs == "EPSG:4326" || json_crs == 
"OGC:CRS84")) {
+    // crs can be left empty because these cases both correspond to
+    // longitude/latitude in WGS84 according to the Parquet specification
+    return "";
+  } else if (json_crs.IsObject()) {
+    // Attempt to detect common PROJJSON representations of longitude/latitude 
and return
+    // an empty crs to maximize compatibility with readers that do not 
implement CRS
+    // support. PROJJSON stores this in the "id" member like:
+    // {..., "id": {"authority": "...", "code": "..."}}
+    if (json_crs.HasMember("id")) {
+      const auto& identifier = json_crs["id"];
+      if (identifier.HasMember("authority") && identifier.HasMember("code")) {
+        if (identifier["authority"] == "OGC" && identifier["code"] == "CRS84") 
{
+          return "";
+        } else if (identifier["authority"] == "EPSG" && identifier["code"] == 
4326) {
+          return "";
+        }
+      }
+    }
+  }
+
+  // If we could not detect a longitude/latitude CRS, just write the string to 
the
+  // LogicalType crs (being sure to unescape a JSON string into a regular 
string)
+  if (json_crs.IsString()) {
+    return json_crs.GetString();
+  } else {
+    rj::StringBuffer buffer;
+    rj::Writer<rj::StringBuffer> writer(buffer);
+    json_crs.Accept(writer);
+    return buffer.GetString();
+  }
+}
+
+::arrow::Result<std::string> MakeGeoArrowCrsMetadata(
+    const std::string& crs,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  const std::string kSridPrefix{"srid:"};
+  const std::string kProjjsonPrefix{"projjson:"};
+
+  // Two reccomendataions are explicitly mentioned in the Parquet format for 
the

Review Comment:
   ```suggestion
     // Two recommendations are explicitly mentioned in the Parquet format for 
the
   ```
   



##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1849,8 +1867,158 @@ TEST_F(TestValuesWriterInt32Type, 
AvoidCompressedInDataPageV2) {
     verify_only_one_uncompressed_page(/*total_num_values=*/1);
   }
 }
-
 #endif
 
+// Test writing and reading geometry columns
+class TestGeometryValuesWriter : public TestPrimitiveWriter<ByteArrayType> {
+ public:
+  void SetUpSchema(Repetition::type repetition, int num_columns) override {
+    std::vector<schema::NodePtr> fields;
+
+    for (int i = 0; i < num_columns; ++i) {
+      std::string name = TestColumnName(i);
+      std::shared_ptr<const LogicalType> logical_type =
+          GeometryLogicalType::Make("srid:1234");
+      fields.push_back(schema::PrimitiveNode::Make(name, repetition, 
logical_type,
+                                                   ByteArrayType::type_num));
+    }
+    node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+    schema_.Init(node_);
+  }
+
+  void GenerateData(int64_t num_values, uint32_t seed = 0) {
+    values_.resize(num_values);
+
+    buffer_.resize(num_values * kWkbPointXYSize);
+    uint8_t* ptr = buffer_.data();
+    for (int k = 0; k < num_values; k++) {
+      std::string item = test::MakeWKBPoint(
+          {static_cast<double>(k), static_cast<double>(k + 1)}, false, false);
+      std::memcpy(ptr, item.data(), item.size());
+      values_[k].len = kWkbPointXYSize;
+      values_[k].ptr = ptr;
+      ptr += kWkbPointXYSize;
+    }
+
+    values_ptr_ = values_.data();
+  }
+};
+
+TEST_F(TestGeometryValuesWriter, TestWriteAndRead) {
+  this->SetUpSchema(Repetition::REQUIRED, 1);
+  this->GenerateData(SMALL_SIZE);
+  size_t num_values = this->values_.size();
+  auto writer = this->BuildWriter(num_values, ColumnProperties());
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, 
this->values_.data());
+
+  writer->Close();
+  this->ReadColumn();
+  for (size_t i = 0; i < num_values; i++) {
+    const ByteArray& value = this->values_out_[i];
+    auto xy = GetWKBPointCoordinateXY(value);
+    EXPECT_TRUE(xy.has_value());
+    auto expected_x = static_cast<double>(i);
+    auto expected_y = static_cast<double>(i + 1);
+    EXPECT_EQ(*xy, (std::pair<double, double>(expected_x, expected_y)));

Review Comment:
   Nit: I think you can simply write
   ```suggestion
       EXPECT_EQ(*xy, std::pair{expected_x, expected_y});
   ```
   or
   ```suggestion
       EXPECT_EQ(*xy, std::make_pair(expected_x, expected_y));
   ```



##########
cpp/src/parquet/types.cc:
##########
@@ -479,6 +480,27 @@ std::shared_ptr<const LogicalType> LogicalType::FromThrift(
     return UUIDLogicalType::Make();
   } else if (type.__isset.FLOAT16) {
     return Float16LogicalType::Make();
+  } else if (type.__isset.GEOMETRY) {
+    std::string crs;
+    if (type.GEOMETRY.__isset.crs) {
+      crs = type.GEOMETRY.crs;
+    }
+
+    return GeometryLogicalType::Make(crs);
+  } else if (type.__isset.GEOGRAPHY) {
+    std::string crs;
+    if (type.GEOGRAPHY.__isset.crs) {
+      crs = type.GEOGRAPHY.crs;
+    }
+
+    LogicalType::EdgeInterpolationAlgorithm algorithm;
+    if (!type.GEOGRAPHY.__isset.algorithm) {
+      algorithm = LogicalType::EdgeInterpolationAlgorithm::SPHERICAL;
+    } else {
+      algorithm = ::parquet::FromThrift(type.GEOGRAPHY.algorithm);
+    }
+
+    return GeographyLogicalType::Make(crs, algorithm);

Review Comment:
   ```suggestion
       return GeographyLogicalType::Make(std::move(crs), algorithm);
   ```



##########
cpp/src/parquet/geospatial/util_internal.cc:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_internal.h"
+
+#include <sstream>
+
+#include "arrow/util/endian.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/ubsan.h"
+#include "parquet/exception.h"
+
+namespace parquet::geospatial {
+
+std::string BoundingBox::ToString() const {
+  std::stringstream ss;
+  ss << "BoundingBox" << std::endl;
+  ss << "  x: [" << min[0] << ", " << max[0] << "]" << std::endl;
+  ss << "  y: [" << min[1] << ", " << max[1] << "]" << std::endl;
+  ss << "  z: [" << min[2] << ", " << max[2] << "]" << std::endl;
+  ss << "  m: [" << min[3] << ", " << max[3] << "]" << std::endl;
+
+  return ss.str();
+}
+
+/// \brief Object to keep track of the low-level consumption of a well-known 
binary
+/// geometry
+///
+/// Briefly, ISO well-known binary supported by the Parquet spec is an endian 
byte
+/// (0x01 or 0x00), followed by geometry type + dimensions encoded as a 
(uint32_t),
+/// followed by geometry-specific data. Coordinate sequences are represented 
by a
+/// uint32_t (the number of coordinates) plus a sequence of doubles (number of 
coordinates
+/// multiplied by the number of dimensions).
+class WKBBuffer {
+ public:
+  WKBBuffer() : data_(nullptr), size_(0) {}
+  WKBBuffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
+
+  uint8_t ReadUInt8() { return ReadChecked<uint8_t>(); }
+
+  uint32_t ReadUInt32(bool swap) {
+    auto value = ReadChecked<uint32_t>();
+    if (swap) {
+      return ::arrow::bit_util::ByteSwap(value);
+    } else {
+      return value;
+    }
+  }
+
+  template <typename Coord, typename Visit>
+  void ReadDoubles(uint32_t n_coords, bool swap, Visit&& visit) {

Review Comment:
   Why is it called `ReadDoubles`? `ReadCoords` perhaps?



##########
python/pyarrow/_parquet.pyx:
##########
@@ -450,6 +590,24 @@ cdef class ColumnChunkMetaData(_Weakrefable):
         statistics.init(self.metadata.statistics(), self)
         return statistics
 
+    @property
+    def is_geo_stats_set(self):
+        """Whether or not geometry statistics are present in metadata 
(bool)."""
+        return self.metadata.is_geo_stats_set()
+
+    @property
+    def geo_statistics(self):
+        """Statistics for column chunk (:class:`GeoStatistics`)."""
+        if not self.metadata.is_geo_stats_set():
+            return None
+
+        if not self.metadata.geo_statistics().get().is_valid():
+            return None
+
+        cdef GeoStatistics geo_statistics = 
GeoStatistics.__new__(GeoStatistics)
+        geo_statistics.init(self.metadata.geo_statistics(), self)
+        return geo_statistics

Review Comment:
   This is a bit verbose and doing redundant work. How about something like:
   ```suggestion
           c_geo_statistics = self.metadata.geo_statistics()
           if not c_geo_statistics or not c_geo_statistics.get().is_valid():
               return None
           cdef GeoStatistics geo_statistics = 
GeoStatistics.__new__(GeoStatistics)
           geo_statistics.init(c_geo_statistics, self)
           return geo_statistics
   ```



##########
cpp/src/parquet/geospatial/util_internal.cc:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_internal.h"
+
+#include <sstream>
+
+#include "arrow/util/endian.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/ubsan.h"
+#include "parquet/exception.h"
+
+namespace parquet::geospatial {
+
+std::string BoundingBox::ToString() const {
+  std::stringstream ss;
+  ss << "BoundingBox" << std::endl;
+  ss << "  x: [" << min[0] << ", " << max[0] << "]" << std::endl;
+  ss << "  y: [" << min[1] << ", " << max[1] << "]" << std::endl;
+  ss << "  z: [" << min[2] << ", " << max[2] << "]" << std::endl;
+  ss << "  m: [" << min[3] << ", " << max[3] << "]" << std::endl;
+
+  return ss.str();
+}
+
+/// \brief Object to keep track of the low-level consumption of a well-known 
binary
+/// geometry
+///
+/// Briefly, ISO well-known binary supported by the Parquet spec is an endian 
byte
+/// (0x01 or 0x00), followed by geometry type + dimensions encoded as a 
(uint32_t),
+/// followed by geometry-specific data. Coordinate sequences are represented 
by a
+/// uint32_t (the number of coordinates) plus a sequence of doubles (number of 
coordinates
+/// multiplied by the number of dimensions).
+class WKBBuffer {
+ public:
+  WKBBuffer() : data_(nullptr), size_(0) {}
+  WKBBuffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
+
+  uint8_t ReadUInt8() { return ReadChecked<uint8_t>(); }
+
+  uint32_t ReadUInt32(bool swap) {
+    auto value = ReadChecked<uint32_t>();
+    if (swap) {
+      return ::arrow::bit_util::ByteSwap(value);
+    } else {
+      return value;
+    }
+  }
+
+  template <typename Coord, typename Visit>
+  void ReadDoubles(uint32_t n_coords, bool swap, Visit&& visit) {
+    size_t total_bytes = n_coords * sizeof(Coord);
+    if (size_ < total_bytes) {
+      throw ParquetException("Can't read coordinate sequence of ", total_bytes,
+                             " bytes from WKBBuffer with ", size_, " 
remaining");
+    }
+
+    if (swap) {
+      Coord coord;
+      for (uint32_t i = 0; i < n_coords; i++) {
+        coord = ReadUnchecked<Coord>();
+        for (auto& c : coord) {
+          c = ::arrow::bit_util::ByteSwap(c);
+        }
+
+        visit(coord);
+      }
+    } else {
+      for (uint32_t i = 0; i < n_coords; i++) {
+        visit(ReadUnchecked<Coord>());
+      }
+    }
+  }
+
+  size_t size() { return size_; }
+
+ private:
+  const uint8_t* data_;
+  size_t size_;

Review Comment:
   Why not use a `util::span<const uint8_t>`?



##########
cpp/src/parquet/geospatial/util_internal_test.cc:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <cmath>
+#include <cstring>
+
+#include "parquet/geospatial/util_internal.h"
+#include "parquet/test_util.h"
+
+namespace parquet::geospatial {
+
+TEST(TestGeometryUtil, TestBoundingBox) {
+  BoundingBox box;
+  EXPECT_EQ(box, BoundingBox({kInf, kInf, kInf, kInf}, {-kInf, -kInf, -kInf, 
-kInf}));
+  EXPECT_EQ(box.ToString(),
+            "BoundingBox\n  x: [inf, -inf]\n  y: [inf, -inf]\n  z: [inf, 
-inf]\n  m: "
+            "[inf, -inf]\n");
+
+  BoundingBox box_xyzm({-1, -2, -3, -4}, {1, 2, 3, 4});
+  BoundingBox box_xy({-10, -20, kInf, kInf}, {10, 20, -kInf, -kInf});
+  BoundingBox box_xyz({kInf, kInf, -30, kInf}, {-kInf, -kInf, 30, -kInf});
+  BoundingBox box_xym({kInf, kInf, kInf, -40}, {-kInf, -kInf, -kInf, 40});
+
+  box_xyzm.Merge(box_xy);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -3, -4}, {10, 20, 3, 4}));
+  EXPECT_EQ(box_xyzm.ToString(),
+            "BoundingBox\n  x: [-10, 10]\n  y: [-20, 20]\n  z: [-3, 3]\n  m: "
+            "[-4, 4]\n");

Review Comment:
   If you don't check that some bounding boxes are unequal, then you may miss 
bugs in `operator==` or `operator!=`.



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+struct PARQUET_EXPORT EncodedGeoStatistics {
+  bool xy_bounds_present{};
+  double xmin{kNaN};
+  double xmax{kNaN};
+  double ymin{kNaN};
+  double ymax{kNaN};
+
+  bool z_bounds_present{};
+  double zmin{kNaN};
+  double zmax{kNaN};
+
+  bool m_bounds_present{};
+  double mmin{kNaN};
+  double mmax{kNaN};
+
+  bool writer_calculated_geospatial_types() const { return 
!geospatial_types.empty(); }
+  std::vector<int32_t> geospatial_types;
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// Note that NaN values that were encountered within coordinates are omitted; 
however,
+/// NaN values that were obtained via decoding encoded statistics are 
propagated. This
+/// behaviour ensures C++ clients that are inspecting statistics via the 
column metadata
+/// can detect the case where a writer generated NaNs (even though this 
implementation
+/// does not generate them).
+///
+/// The handling of NaN values in coordinates is not well-defined among 
bounding
+/// implementations except for the WKB convention for POINT EMPTY, which is 
consistently
+/// represented as a point whose ordinates are all NaN. Any other geometry 
that contains
+/// NaNs cannot expect defined behaviour here or elsewhere; however, a row 
group that
+/// contains both NaN-containing and normal (completely finite) geometries 
should not be
+/// excluded from predicate pushdown.
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered or if the statistics contain NaN
+  /// for any reason, Encode() will return nullopt to indicate that
+  /// statistics should not be written to thrift.
+  std::optional<EncodedGeoStatistics> Encode() const;
+
+  /// \brief Returns false if invalid WKB was encountered
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief All minimum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be +Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> lower_bound() const;
+
+  /// \brief All maximum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be -Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> upper_bound() const;
+
+  /// \brief Returns true if the dimension is valid and any non-NaN values were
+  /// encountered in the given dimension in XYZM order
+  std::array<bool, kMaxDimensions> dimension_empty() const;
+
+  /// \brief Returns false if a bound was explicitly not calculated by a 
writer when
+  /// importing statistics from thrift
+  ///
+  /// When calculating statistics, this will always be true because this 
implementation
+  /// calculates statistics for all dimensions.
+  std::array<bool, kMaxDimensions> dimension_valid() const;

Review Comment:
   Can you also explain the relationship between `dimension_empty` and 
`dimension_valid`? It's not clear to me.



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+struct PARQUET_EXPORT EncodedGeoStatistics {
+  bool xy_bounds_present{};

Review Comment:
   Nit: make this more readable by making the initialization value more 
explicit?
   ```suggestion
     bool xy_bounds_present{false};
   ```



##########
cpp/src/parquet/geospatial/statistics.h:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "parquet/platform.h"
+#include "parquet/types.h"
+
+namespace parquet::geospatial {
+
+/// \brief The maximum number of dimensions represented by a geospatial type
+/// (i.e., X, Y, Z, and M)
+static constexpr int kMaxDimensions = 4;
+
+/// \brief NaN, used to represent bounds for which predicate pushdown cannnot
+/// be applied (e.g., because a writer did not provide bounds for a given 
dimension)
+constexpr double kNaN = std::numeric_limits<double>::quiet_NaN();
+
+/// \brief Structure represented encoded statistics to be written to and read 
from Parquet
+/// serialized metadata.
+///
+/// See the Parquet Thrift definition and GeoStatistics for the specific 
definition
+/// of field values.
+struct PARQUET_EXPORT EncodedGeoStatistics {
+  bool xy_bounds_present{};
+  double xmin{kNaN};
+  double xmax{kNaN};
+  double ymin{kNaN};
+  double ymax{kNaN};
+
+  bool z_bounds_present{};
+  double zmin{kNaN};
+  double zmax{kNaN};
+
+  bool m_bounds_present{};
+  double mmin{kNaN};
+  double mmax{kNaN};
+
+  bool writer_calculated_geospatial_types() const { return 
!geospatial_types.empty(); }
+  std::vector<int32_t> geospatial_types;
+};
+
+class GeoStatisticsImpl;
+
+/// \brief Base type for computing geospatial column statistics while writing 
a file
+/// or representing them when reading a file
+///
+/// Note that NaN values that were encountered within coordinates are omitted; 
however,
+/// NaN values that were obtained via decoding encoded statistics are 
propagated. This
+/// behaviour ensures C++ clients that are inspecting statistics via the 
column metadata
+/// can detect the case where a writer generated NaNs (even though this 
implementation
+/// does not generate them).
+///
+/// The handling of NaN values in coordinates is not well-defined among 
bounding
+/// implementations except for the WKB convention for POINT EMPTY, which is 
consistently
+/// represented as a point whose ordinates are all NaN. Any other geometry 
that contains
+/// NaNs cannot expect defined behaviour here or elsewhere; however, a row 
group that
+/// contains both NaN-containing and normal (completely finite) geometries 
should not be
+/// excluded from predicate pushdown.
+///
+/// EXPERIMENTAL
+class PARQUET_EXPORT GeoStatistics {
+ public:
+  GeoStatistics();
+  explicit GeoStatistics(const EncodedGeoStatistics& encoded);
+
+  ~GeoStatistics();
+
+  /// \brief Return true if bounds, geometry types, and validity are identical
+  bool Equals(const GeoStatistics& other) const;
+
+  /// \brief Update these statistics based on previously calculated or decoded 
statistics
+  void Merge(const GeoStatistics& other);
+
+  /// \brief Update these statistics based on values
+  void Update(const ByteArray* values, int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values);
+
+  /// \brief Update these statistics based on the non-null elements of values
+  ///
+  /// Currently, BinaryArray and LargeBinaryArray input is supported.
+  void Update(const ::arrow::Array& values);
+
+  /// \brief Return these statistics to an empty state
+  void Reset();
+
+  /// \brief Encode the statistics for serializing to Thrift
+  ///
+  /// If invalid WKB was encountered or if the statistics contain NaN
+  /// for any reason, Encode() will return nullopt to indicate that
+  /// statistics should not be written to thrift.
+  std::optional<EncodedGeoStatistics> Encode() const;
+
+  /// \brief Returns false if invalid WKB was encountered
+  bool is_valid() const;
+
+  /// \brief Reset existing statistics and populate them from 
previously-encoded ones
+  void Decode(const EncodedGeoStatistics& encoded);
+
+  /// \brief All minimum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be +Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> lower_bound() const;
+
+  /// \brief All maximum values in XYZM order
+  ///
+  /// For dimensions where dimension_valid() is false, the value will be NaN. 
For
+  /// dimensions where dimension_empty() is true, the value will be -Inf.
+  ///
+  /// For the first dimension (X) only, wraparound bounds apply where xmin > 
xmax. In this
+  /// case, these bounds represent the union of the intervals [xmax, Inf] and 
[-Inf,
+  /// xmin]. This implementation does not yet generate these types of bounds 
but they may
+  /// be encountered in statistics imported from Thrift.
+  std::array<double, kMaxDimensions> upper_bound() const;
+
+  /// \brief Returns true if the dimension is valid and any non-NaN values were
+  /// encountered in the given dimension in XYZM order
+  std::array<bool, kMaxDimensions> dimension_empty() const;
+
+  /// \brief Returns false if a bound was explicitly not calculated by a 
writer when
+  /// importing statistics from thrift
+  ///
+  /// When calculating statistics, this will always be true because this 
implementation
+  /// calculates statistics for all dimensions.
+  std::array<bool, kMaxDimensions> dimension_valid() const;
+
+  /// \brief Return the geometry type codes from the well-known binary 
encountered
+  ///
+  /// This implementation always returns sorted output with no duplicates. 
Returns
+  /// std::nullopt if the writer did not calculate geometry types for 
statistics
+  /// imported from Thrift.

Review Comment:
   Same question: what does the writer have to do with importing statistics 
from Thrift?



##########
cpp/src/parquet/geospatial/util_json_internal.cc:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_json_internal.h"
+
+#include <string>
+
+#include "arrow/extension_type.h"
+#include "arrow/json/rapidjson_defs.h"  // IWYU pragma: keep
+#include "arrow/result.h"
+#include "arrow/util/string.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+::arrow::Result<std::string> GeospatialGeoArrowCrsToParquetCrs(
+    const ::arrow::rapidjson::Document& document) {
+  namespace rj = ::arrow::rapidjson;
+
+  if (!document.HasMember("crs") || document["crs"].IsNull()) {
+    // Parquet GEOMETRY/GEOGRAPHY do not have a concept of a null/missing
+    // CRS, but an omitted one is more likely to have meant "lon/lat" than
+    // a truly unspecified one (i.e., Engineering CRS with arbitrary XY units)
+    return "";
+  }
+
+  const auto& json_crs = document["crs"];
+  if (json_crs.IsString() && (json_crs == "EPSG:4326" || json_crs == 
"OGC:CRS84")) {
+    // crs can be left empty because these cases both correspond to
+    // longitude/latitude in WGS84 according to the Parquet specification
+    return "";
+  } else if (json_crs.IsObject()) {
+    // Attempt to detect common PROJJSON representations of longitude/latitude 
and return
+    // an empty crs to maximize compatibility with readers that do not 
implement CRS
+    // support. PROJJSON stores this in the "id" member like:
+    // {..., "id": {"authority": "...", "code": "..."}}
+    if (json_crs.HasMember("id")) {
+      const auto& identifier = json_crs["id"];
+      if (identifier.HasMember("authority") && identifier.HasMember("code")) {
+        if (identifier["authority"] == "OGC" && identifier["code"] == "CRS84") 
{
+          return "";
+        } else if (identifier["authority"] == "EPSG" && identifier["code"] == 
4326) {
+          return "";
+        }
+      }
+    }
+  }
+
+  // If we could not detect a longitude/latitude CRS, just write the string to 
the
+  // LogicalType crs (being sure to unescape a JSON string into a regular 
string)
+  if (json_crs.IsString()) {
+    return json_crs.GetString();
+  } else {
+    rj::StringBuffer buffer;
+    rj::Writer<rj::StringBuffer> writer(buffer);
+    json_crs.Accept(writer);
+    return buffer.GetString();
+  }
+}
+
+::arrow::Result<std::string> MakeGeoArrowCrsMetadata(
+    const std::string& crs,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  const std::string kSridPrefix{"srid:"};
+  const std::string kProjjsonPrefix{"projjson:"};
+
+  // Two reccomendataions are explicitly mentioned in the Parquet format for 
the
+  // LogicalType crs:
+  //
+  // - "srid:XXXX" as a way to encode an application-specific integer 
identifier
+  // - "projjson:some_field_name" as a way to avoid repeating PROJJSON strings
+  //   unnecessarily (with a suggestion to place them in the file metadata)
+  //
+  // While we don't currently generate those values to reduce the complexity
+  // of the writer, we do interpret these values according to the suggestion in
+  // the format and pass on this information to GeoArrow.
+  if (crs.empty()) {
+    return R"("crs": "OGC:CRS84", "crs_type": "authority_code")";
+  } else if (::arrow::internal::StartsWith(crs, kSridPrefix)) {
+    return R"("crs": ")" + crs.substr(kSridPrefix.size()) + R"(", "crs_type": 
"srid")";
+  } else if (::arrow::internal::StartsWith(crs, kProjjsonPrefix)) {
+    std::string metadata_field = crs.substr(kProjjsonPrefix.size());
+    if (metadata && metadata->Contains(metadata_field)) {
+      ARROW_ASSIGN_OR_RAISE(std::string projjson_value, 
metadata->Get(metadata_field));
+      return R"("crs": )" + projjson_value + R"(, "crs_type": "projjson")";

Review Comment:
   Do we want to ensure that `projjson_value` is actually valid JSON?



##########
cpp/src/parquet/geospatial/util_json_internal.h:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string_view>
+
+#include "arrow/util/key_value_metadata.h"
+
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// \brief Compute a Parquet Logical type (Geometry(...) or Geography(...)) 
from
+/// GeoArrow `ARROW:extension:metadata` (JSON-encoded extension type metadata)
+///
+/// Returns the appropriate LogicalType, Invalid if the metadata was invalid,
+/// or NotImplemented if Parquet was not built with ARROW_JSON.

Review Comment:
   Let's remove the ARROW_JSON mention?



##########
cpp/src/parquet/geospatial/util_internal_test.cc:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <cmath>
+#include <cstring>
+
+#include "parquet/geospatial/util_internal.h"
+#include "parquet/test_util.h"
+
+namespace parquet::geospatial {
+
+TEST(TestGeometryUtil, TestBoundingBox) {
+  BoundingBox box;
+  EXPECT_EQ(box, BoundingBox({kInf, kInf, kInf, kInf}, {-kInf, -kInf, -kInf, 
-kInf}));
+  EXPECT_EQ(box.ToString(),
+            "BoundingBox\n  x: [inf, -inf]\n  y: [inf, -inf]\n  z: [inf, 
-inf]\n  m: "
+            "[inf, -inf]\n");
+
+  BoundingBox box_xyzm({-1, -2, -3, -4}, {1, 2, 3, 4});
+  BoundingBox box_xy({-10, -20, kInf, kInf}, {10, 20, -kInf, -kInf});
+  BoundingBox box_xyz({kInf, kInf, -30, kInf}, {-kInf, -kInf, 30, -kInf});
+  BoundingBox box_xym({kInf, kInf, kInf, -40}, {-kInf, -kInf, -kInf, 40});
+
+  box_xyzm.Merge(box_xy);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -3, -4}, {10, 20, 3, 4}));
+  EXPECT_EQ(box_xyzm.ToString(),
+            "BoundingBox\n  x: [-10, 10]\n  y: [-20, 20]\n  z: [-3, 3]\n  m: "
+            "[-4, 4]\n");
+
+  box_xyzm.Merge(box_xyz);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -30, -4}, {10, 20, 30, 4}));
+
+  box_xyzm.Merge(box_xym);
+  EXPECT_EQ(box_xyzm, BoundingBox({-10, -20, -30, -40}, {10, 20, 30, 40}));
+
+  double nan_dbl = std::numeric_limits<double>::quiet_NaN();
+  BoundingBox box_nan({nan_dbl, nan_dbl, nan_dbl, nan_dbl},
+                      {nan_dbl, nan_dbl, nan_dbl, nan_dbl});
+  box_xyzm.Merge(box_nan);
+  for (int i = 0; i < 4; i++) {
+    EXPECT_TRUE(std::isnan(box_xyzm.min[i]));
+    EXPECT_TRUE(std::isnan(box_xyzm.max[i]));
+  }
+
+  box_xyzm.Reset();
+  EXPECT_EQ(box_xyzm, BoundingBox());
+}
+
+struct WKBTestCase {
+  WKBTestCase() = default;
+  WKBTestCase(GeometryType geometry_type, Dimensions dimensions,
+              const std::vector<uint8_t>& wkb, const std::vector<double>& 
box_values = {})
+      : geometry_type(geometry_type), dimensions(dimensions), wkb(wkb) {
+    std::array<double, 4> mins = {kInf, kInf, kInf, kInf};
+    std::array<double, 4> maxes{-kInf, -kInf, -kInf, -kInf};
+
+    if (dimensions == Dimensions::kXYM) {
+      mins = {box_values[0], box_values[1], kInf, box_values[2]};
+      maxes = {box_values[3], box_values[4], -kInf, box_values[5]};
+    } else {
+      size_t coord_size = box_values.size() / 2;
+      for (uint32_t i = 0; i < coord_size; i++) {
+        mins[i] = box_values[i];
+        maxes[i] = box_values[coord_size + i];
+      }
+    }
+
+    box = BoundingBox(mins, maxes);
+  }
+  WKBTestCase(const WKBTestCase& other) = default;
+
+  GeometryType geometry_type;
+  Dimensions dimensions;
+  std::vector<uint8_t> wkb;
+  BoundingBox box;
+};
+
+std::ostream& operator<<(std::ostream& os, const WKBTestCase& obj) {
+  uint32_t iso_wkb_geometry_type =
+      static_cast<int>(obj.dimensions) * 1000 + 
static_cast<int>(obj.geometry_type);
+  os << "WKBTestCase<" << iso_wkb_geometry_type << ">";
+  return os;
+}
+
+std::ostream& operator<<(std::ostream& os, const BoundingBox& obj) {
+  os << obj.ToString();
+  return os;
+}
+
+class WKBTestFixture : public ::testing::TestWithParam<WKBTestCase> {
+ protected:
+  WKBTestCase test_case;
+};
+
+TEST_P(WKBTestFixture, TestWKBBounderBounds) {
+  auto item = GetParam();
+
+  WKBGeometryBounder bounder;
+  EXPECT_EQ(bounder.Bounds(), BoundingBox());
+
+  ASSERT_NO_THROW(bounder.MergeGeometry(
+      {reinterpret_cast<const char*>(item.wkb.data()), item.wkb.size()}));
+
+  EXPECT_EQ(bounder.Bounds(), item.box);
+  uint32_t wkb_type =
+      static_cast<int>(item.dimensions) * 1000 + 
static_cast<int>(item.geometry_type);
+  EXPECT_THAT(bounder.GeometryTypes(), 
::testing::ElementsAre(::testing::Eq(wkb_type)));
+
+  bounder.Reset();
+  EXPECT_EQ(bounder.Bounds(), BoundingBox());
+  EXPECT_TRUE(bounder.GeometryTypes().empty());
+}
+
+TEST_P(WKBTestFixture, TestWKBBounderErrorForTruncatedInput) {
+  auto item = GetParam();
+  WKBGeometryBounder bounder;
+
+  // Make sure an error occurs for any version of truncated input to the 
bounder

Review Comment:
   Apparently we should also check that a too large input should throw as well?



##########
cpp/src/parquet/arrow/arrow_schema_test.cc:
##########
@@ -1090,6 +1094,49 @@ TEST_F(TestConvertParquetSchema, 
ParquetSchemaArrowUuidExtension) {
   }
 }
 
+TEST_F(TestConvertParquetSchema, ParquetSchemaGeoArrowExtensions) {
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               LogicalType::Geometry(),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               LogicalType::Geography(),
+                                               ParquetType::BYTE_ARRAY));
+
+  {
+    // Parquet file does not contain Arrow schema.
+    // By default, both fields should be treated as binary() fields in Arrow.
+    auto arrow_schema = ::arrow::schema({::arrow::field("geometry", BINARY, 
true),
+                                         ::arrow::field("geography", BINARY, 
true)});
+    std::shared_ptr<KeyValueMetadata> metadata{};
+    ASSERT_OK(ConvertSchema(parquet_fields, metadata));
+    CheckFlatSchema(arrow_schema);
+  }
+
+  {
+    // Parquet file does not contain Arrow schema.
+    // If Arrow extensions are enabled and extensions are registered,
+    // fields will be interpreted as geoarrow_wkb(binary()) extension fields.
+    ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+    ArrowReaderProperties props;
+    props.set_arrow_extensions_enabled(true);
+    auto arrow_schema = ::arrow::schema(
+        {::arrow::field(
+             "geometry",
+             test::geoarrow_wkb(R"({"crs": "OGC:CRS84", "crs_type": 
"authority_code"})"),
+             true),
+         ::arrow::field(
+             "geography",
+             test::geoarrow_wkb(
+                 R"({"crs": "OGC:CRS84", "crs_type": "authority_code", 
"edges": "spherical"})"),
+             true)});
+    std::shared_ptr<KeyValueMetadata> metadata{};
+    ASSERT_OK(ConvertSchema(parquet_fields, metadata, props));
+    CheckFlatSchema(arrow_schema);
+  }

Review Comment:
   Can you also add a test for when Arrow extensions are enabled but the 
extension type isn't registered?



##########
cpp/src/parquet/arrow/arrow_schema_test.cc:
##########
@@ -1343,6 +1390,96 @@ TEST_F(TestConvertArrowSchema, 
ParquetFlatPrimitivesAsDictionaries) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsLonLat) {
+  // All the Arrow Schemas below should convert to the type defaults for 
GEOMETRY
+  // and GEOGRAPHY when GeoArrow extension types are registered and the 
appropriate
+  // writer option is set.
+  ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               LogicalType::Geometry(),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               LogicalType::Geography(),
+                                               ParquetType::BYTE_ARRAY));
+
+  // There are several ways that longitude/latitude could be specified when 
coming from
+  // GeoArrow, which allows null, missing, arbitrary strings (e.g., 
Authority:Code), and
+  // PROJJSON.
+  std::vector<std::string> geoarrow_lonlat = {
+      "null", R"("OGC:CRS84")", R"("EPSG:4326")",
+      // Purely the parts of the PROJJSON that we inspect to check the lon/lat 
case
+      R"({"id": {"authority": "OGC", "code": "CRS84"}})",
+      R"({"id": {"authority": "EPSG", "code": 4326}})"};
+
+  std::string geoarrow_lonlatish_crs = geoarrow_lonlat[0];

Review Comment:
   This line is stale, isn't it?



##########
cpp/src/parquet/geospatial/util_json_internal.cc:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/util_json_internal.h"
+
+#include <string>
+
+#include "arrow/extension_type.h"
+#include "arrow/json/rapidjson_defs.h"  // IWYU pragma: keep
+#include "arrow/result.h"
+#include "arrow/util/string.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+namespace {
+::arrow::Result<std::string> GeospatialGeoArrowCrsToParquetCrs(
+    const ::arrow::rapidjson::Document& document) {
+  namespace rj = ::arrow::rapidjson;
+
+  if (!document.HasMember("crs") || document["crs"].IsNull()) {
+    // Parquet GEOMETRY/GEOGRAPHY do not have a concept of a null/missing
+    // CRS, but an omitted one is more likely to have meant "lon/lat" than
+    // a truly unspecified one (i.e., Engineering CRS with arbitrary XY units)
+    return "";
+  }
+
+  const auto& json_crs = document["crs"];
+  if (json_crs.IsString() && (json_crs == "EPSG:4326" || json_crs == 
"OGC:CRS84")) {
+    // crs can be left empty because these cases both correspond to
+    // longitude/latitude in WGS84 according to the Parquet specification
+    return "";
+  } else if (json_crs.IsObject()) {
+    // Attempt to detect common PROJJSON representations of longitude/latitude 
and return
+    // an empty crs to maximize compatibility with readers that do not 
implement CRS
+    // support. PROJJSON stores this in the "id" member like:
+    // {..., "id": {"authority": "...", "code": "..."}}
+    if (json_crs.HasMember("id")) {
+      const auto& identifier = json_crs["id"];
+      if (identifier.HasMember("authority") && identifier.HasMember("code")) {
+        if (identifier["authority"] == "OGC" && identifier["code"] == "CRS84") 
{
+          return "";
+        } else if (identifier["authority"] == "EPSG" && identifier["code"] == 
4326) {
+          return "";
+        }
+      }
+    }
+  }
+
+  // If we could not detect a longitude/latitude CRS, just write the string to 
the
+  // LogicalType crs (being sure to unescape a JSON string into a regular 
string)
+  if (json_crs.IsString()) {
+    return json_crs.GetString();
+  } else {
+    rj::StringBuffer buffer;
+    rj::Writer<rj::StringBuffer> writer(buffer);
+    json_crs.Accept(writer);
+    return buffer.GetString();
+  }
+}
+
+::arrow::Result<std::string> MakeGeoArrowCrsMetadata(
+    const std::string& crs,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  const std::string kSridPrefix{"srid:"};
+  const std::string kProjjsonPrefix{"projjson:"};
+
+  // Two reccomendataions are explicitly mentioned in the Parquet format for 
the
+  // LogicalType crs:
+  //
+  // - "srid:XXXX" as a way to encode an application-specific integer 
identifier
+  // - "projjson:some_field_name" as a way to avoid repeating PROJJSON strings
+  //   unnecessarily (with a suggestion to place them in the file metadata)
+  //
+  // While we don't currently generate those values to reduce the complexity
+  // of the writer, we do interpret these values according to the suggestion in
+  // the format and pass on this information to GeoArrow.
+  if (crs.empty()) {
+    return R"("crs": "OGC:CRS84", "crs_type": "authority_code")";
+  } else if (::arrow::internal::StartsWith(crs, kSridPrefix)) {
+    return R"("crs": ")" + crs.substr(kSridPrefix.size()) + R"(", "crs_type": 
"srid")";
+  } else if (::arrow::internal::StartsWith(crs, kProjjsonPrefix)) {
+    std::string metadata_field = crs.substr(kProjjsonPrefix.size());
+    if (metadata && metadata->Contains(metadata_field)) {
+      ARROW_ASSIGN_OR_RAISE(std::string projjson_value, 
metadata->Get(metadata_field));
+      return R"("crs": )" + projjson_value + R"(, "crs_type": "projjson")";
+    }
+  }
+
+  // Pass on the string directly to GeoArrow. If the string is already valid 
JSON,
+  // insert it directly into GeoArrow's "crs" field. Otherwise, escape it and 
pass it as a
+  // string value.
+  namespace rj = ::arrow::rapidjson;
+  rj::Document document;
+  if (document.Parse(crs.data(), crs.length()).HasParseError()) {
+    rj::StringBuffer buffer;
+    rj::Writer<rj::StringBuffer> writer(buffer);
+    writer.String(crs);
+    return R"("crs": )" + std::string(buffer.GetString());
+  } else {
+    return R"("crs": )" + crs;
+  }
+}
+
+}  // namespace
+
+::arrow::Result<std::shared_ptr<const LogicalType>> 
LogicalTypeFromGeoArrowMetadata(
+    std::string_view serialized_data) {
+  // Parquet has no way to interpret a null or missing CRS, so we choose the 
most likely
+  // intent here (that the user meant to use the default Parquet CRS)
+  if (serialized_data.empty() || serialized_data == "{}") {
+    return LogicalType::Geometry();
+  }
+
+  namespace rj = ::arrow::rapidjson;
+  rj::Document document;
+  if (document.Parse(serialized_data.data(), 
serialized_data.length()).HasParseError()) {
+    return ::arrow::Status::Invalid("Invalid serialized JSON data: ", 
serialized_data);
+  }
+
+  ARROW_ASSIGN_OR_RAISE(std::string crs, 
GeospatialGeoArrowCrsToParquetCrs(document));
+
+  if (document.HasMember("edges") && document["edges"] == "planar") {
+    return LogicalType::Geometry(crs);
+  } else if (document.HasMember("edges") && document["edges"] == "spherical") {
+    return LogicalType::Geography(crs,
+                                  
LogicalType::EdgeInterpolationAlgorithm::SPHERICAL);
+  } else if (document.HasMember("edges")) {
+    return ::arrow::Status::Invalid("Unsupported GeoArrow edge type: ", 
serialized_data);
+  }
+
+  return LogicalType::Geometry(crs);
+}
+
+::arrow::Result<std::shared_ptr<::arrow::DataType>> 
GeoArrowTypeFromLogicalType(
+    const LogicalType& logical_type,
+    const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) {
+  // Check if we have a registered GeoArrow type to read into
+  std::shared_ptr<::arrow::ExtensionType> maybe_geoarrow_wkb =
+      ::arrow::GetExtensionType("geoarrow.wkb");

Review Comment:
   GeoArrow uses the same extension type for both geometry and geography?



##########
cpp/src/parquet/arrow/arrow_schema_test.cc:
##########
@@ -1343,6 +1390,96 @@ TEST_F(TestConvertArrowSchema, 
ParquetFlatPrimitivesAsDictionaries) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsLonLat) {
+  // All the Arrow Schemas below should convert to the type defaults for 
GEOMETRY
+  // and GEOGRAPHY when GeoArrow extension types are registered and the 
appropriate
+  // writer option is set.
+  ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               LogicalType::Geometry(),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               LogicalType::Geography(),
+                                               ParquetType::BYTE_ARRAY));
+
+  // There are several ways that longitude/latitude could be specified when 
coming from
+  // GeoArrow, which allows null, missing, arbitrary strings (e.g., 
Authority:Code), and
+  // PROJJSON.
+  std::vector<std::string> geoarrow_lonlat = {
+      "null", R"("OGC:CRS84")", R"("EPSG:4326")",
+      // Purely the parts of the PROJJSON that we inspect to check the lon/lat 
case
+      R"({"id": {"authority": "OGC", "code": "CRS84"}})",
+      R"({"id": {"authority": "EPSG", "code": 4326}})"};
+
+  std::string geoarrow_lonlatish_crs = geoarrow_lonlat[0];
+  for (const auto& geoarrow_lonlatish_crs : geoarrow_lonlat) {
+    SCOPED_TRACE(geoarrow_lonlatish_crs);
+    std::vector<std::shared_ptr<Field>> arrow_fields = {
+        ::arrow::field("geometry",
+                       test::geoarrow_wkb(R"({"crs": )" + 
geoarrow_lonlatish_crs + "}"),
+                       true),
+        ::arrow::field("geography",
+                       test::geoarrow_wkb(R"({"crs": )" + 
geoarrow_lonlatish_crs +
+                                          R"(, "edges": "spherical"})"),
+                       true)};
+
+    ASSERT_OK(ConvertSchema(arrow_fields));
+    ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
+  }
+}
+
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsSrid) {
+  // Checks that it is possible to write the srid:xxxx reccomendation from 
GeoArrow
+  ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               
LogicalType::Geometry("srid:1234"),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               
LogicalType::Geography("srid:5678"),
+                                               ParquetType::BYTE_ARRAY));
+
+  std::vector<std::shared_ptr<Field>> arrow_fields = {
+      ::arrow::field("geometry", test::geoarrow_wkb(R"({"crs": 
"srid:1234"})"), true),
+      ::arrow::field("geography",
+                     test::geoarrow_wkb(R"({"crs": "srid:5678", "edges": 
"spherical"})"),
+                     true)};
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
+}
+
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsProjjson) {
+  // Checks the conversion between GeoArrow that contains non-lon/lat PROJJSON
+  // to Parquet. Almost all GeoArrow types that arrive at the Parquet reader
+  // will have their CRS expressed in this way.

Review Comment:
   ```suggestion
     // Checks the conversion from GeoArrow that contains non-lon/lat PROJJSON
     // to Parquet. Almost all GeoArrow types that arrive at the Parquet reader
     // will have their CRS expressed in this way.
   ```



##########
cpp/src/parquet/geospatial/statistics.cc:
##########
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/statistics.h"
+
+#include <cmath>
+#include <memory>
+#include <optional>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial/util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet::geospatial {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    return is_valid_ == other.is_valid_ &&
+           bounder_.GeometryTypes() == other.bounder_.GeometryTypes() &&
+           bounder_.Bounds() == other.bounder_.Bounds();
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || other.is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not supported by 
GeoStatistics::Merge()");
+    }
+
+    bounder_.MergeBox(other.bounder_.Bounds());
+    std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
+    bounder_.MergeGeometryTypes(other_geometry_types);
+  }
+
+  void Update(const ByteArray* values, int64_t num_values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    for (int64_t i = 0; i < num_values; i++) {
+      const ByteArray& item = values[i];
+      try {
+        bounder_.MergeGeometry({reinterpret_cast<const char*>(item.ptr), 
item.len});
+      } catch (ParquetException&) {
+        is_valid_ = false;
+        return;
+      }
+    }
+  }
+
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values) {
+    DCHECK_GT(num_spaced_values, 0);
+
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    ::arrow::Status status = ::arrow::internal::VisitSetBitRuns(
+        valid_bits, valid_bits_offset, num_spaced_values,
+        [&](int64_t position, int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            ByteArray item = SafeLoad(values + i + position);
+            PARQUET_CATCH_NOT_OK(bounder_.MergeGeometry(
+                {reinterpret_cast<const char*>(item.ptr), item.len}));
+          }
+
+          return ::arrow::Status::OK();
+        });
+
+    if (!status.ok()) {
+      is_valid_ = false;
+    }
+  }
+
+  void Update(const ::arrow::Array& values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    // Note that ::arrow::Type::EXTENSION seems to be handled before this is 
called
+    switch (values.type_id()) {
+      case ::arrow::Type::BINARY:
+        UpdateArrayImpl<::arrow::BinaryArray>(values);
+        break;
+      case ::arrow::Type::LARGE_BINARY:
+        UpdateArrayImpl<::arrow::LargeBinaryArray>(values);
+        break;
+      // This does not currently handle run-end encoded, dictionary encodings, 
or views
+      default:
+        throw ParquetException("Unsupported Array type in 
GeoStatistics::Update(Array): ",
+                               values.type()->ToString());
+    }
+  }
+
+  void Reset() {
+    bounder_.Reset();
+    is_valid_ = true;
+  }
+
+  EncodedGeoStatistics Encode() const {
+    if (!is_valid_) {
+      return {};
+    }
+
+    const geospatial::BoundingBox::XYZM& mins = bounder_.Bounds().min;
+    const geospatial::BoundingBox::XYZM& maxes = bounder_.Bounds().max;
+
+    EncodedGeoStatistics out;
+
+    if (has_geometry_types_) {
+      out.geospatial_types = bounder_.GeometryTypes();
+    }
+
+    bool write_x = !bound_empty(0) && bound_valid(0);
+    bool write_y = !bound_empty(1) && bound_valid(1);
+    bool write_z = write_x && write_y && !bound_empty(2) && bound_valid(2);
+    bool write_m = write_x && write_y && !bound_empty(3) && bound_valid(3);
+
+    if (write_x && write_y) {
+      out.xmin = mins[0];
+      out.xmax = maxes[0];
+      out.ymin = mins[1];
+      out.ymax = maxes[1];
+      out.xy_bounds_present = true;
+    }
+
+    if (write_z) {
+      out.zmin = mins[2];
+      out.zmax = maxes[2];
+      out.z_bounds_present = true;
+    }
+
+    if (write_m) {
+      out.mmin = mins[3];
+      out.mmax = maxes[3];
+      out.m_bounds_present = true;
+    }
+
+    return out;
+  }
+
+  void Update(const EncodedGeoStatistics& encoded) {
+    if (!is_valid_) {
+      return;
+    }
+
+    // We can create GeoStatistics from a wraparound bounding box, but we can't
+    // update an existing one because the merge logic is not yet implemented.

Review Comment:
   Would it be hard to implement? It's a bit weird and user-unfriendly to error 
out here.



##########
cpp/src/parquet/geospatial/statistics.cc:
##########
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/geospatial/statistics.h"
+
+#include <cmath>
+#include <memory>
+#include <optional>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/bit_run_reader.h"
+#include "parquet/exception.h"
+#include "parquet/geospatial/util_internal.h"
+
+using arrow::util::SafeLoad;
+
+namespace parquet::geospatial {
+
+class GeoStatisticsImpl {
+ public:
+  bool Equals(const GeoStatisticsImpl& other) const {
+    return is_valid_ == other.is_valid_ &&
+           bounder_.GeometryTypes() == other.bounder_.GeometryTypes() &&
+           bounder_.Bounds() == other.bounder_.Bounds();
+  }
+
+  void Merge(const GeoStatisticsImpl& other) {
+    is_valid_ = is_valid_ && other.is_valid_;
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x() || other.is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not supported by 
GeoStatistics::Merge()");
+    }
+
+    bounder_.MergeBox(other.bounder_.Bounds());
+    std::vector<int32_t> other_geometry_types = other.bounder_.GeometryTypes();
+    bounder_.MergeGeometryTypes(other_geometry_types);
+  }
+
+  void Update(const ByteArray* values, int64_t num_values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    for (int64_t i = 0; i < num_values; i++) {
+      const ByteArray& item = values[i];
+      try {
+        bounder_.MergeGeometry({reinterpret_cast<const char*>(item.ptr), 
item.len});
+      } catch (ParquetException&) {
+        is_valid_ = false;
+        return;
+      }
+    }
+  }
+
+  void UpdateSpaced(const ByteArray* values, const uint8_t* valid_bits,
+                    int64_t valid_bits_offset, int64_t num_spaced_values,
+                    int64_t num_values) {
+    DCHECK_GT(num_spaced_values, 0);
+
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    ::arrow::Status status = ::arrow::internal::VisitSetBitRuns(
+        valid_bits, valid_bits_offset, num_spaced_values,
+        [&](int64_t position, int64_t length) {
+          for (int64_t i = 0; i < length; i++) {
+            ByteArray item = SafeLoad(values + i + position);
+            PARQUET_CATCH_NOT_OK(bounder_.MergeGeometry(
+                {reinterpret_cast<const char*>(item.ptr), item.len}));
+          }
+
+          return ::arrow::Status::OK();
+        });
+
+    if (!status.ok()) {
+      is_valid_ = false;
+    }
+  }
+
+  void Update(const ::arrow::Array& values) {
+    if (!is_valid_) {
+      return;
+    }
+
+    if (is_wraparound_x()) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    // Note that ::arrow::Type::EXTENSION seems to be handled before this is 
called
+    switch (values.type_id()) {
+      case ::arrow::Type::BINARY:
+        UpdateArrayImpl<::arrow::BinaryArray>(values);
+        break;
+      case ::arrow::Type::LARGE_BINARY:
+        UpdateArrayImpl<::arrow::LargeBinaryArray>(values);
+        break;
+      // This does not currently handle run-end encoded, dictionary encodings, 
or views
+      default:
+        throw ParquetException("Unsupported Array type in 
GeoStatistics::Update(Array): ",
+                               values.type()->ToString());
+    }
+  }
+
+  void Reset() {
+    bounder_.Reset();
+    is_valid_ = true;
+  }
+
+  EncodedGeoStatistics Encode() const {
+    if (!is_valid_) {
+      return {};
+    }
+
+    const geospatial::BoundingBox::XYZM& mins = bounder_.Bounds().min;
+    const geospatial::BoundingBox::XYZM& maxes = bounder_.Bounds().max;
+
+    EncodedGeoStatistics out;
+
+    if (has_geometry_types_) {
+      out.geospatial_types = bounder_.GeometryTypes();
+    }
+
+    bool write_x = !bound_empty(0) && bound_valid(0);
+    bool write_y = !bound_empty(1) && bound_valid(1);
+    bool write_z = write_x && write_y && !bound_empty(2) && bound_valid(2);
+    bool write_m = write_x && write_y && !bound_empty(3) && bound_valid(3);
+
+    if (write_x && write_y) {
+      out.xmin = mins[0];
+      out.xmax = maxes[0];
+      out.ymin = mins[1];
+      out.ymax = maxes[1];
+      out.xy_bounds_present = true;
+    }
+
+    if (write_z) {
+      out.zmin = mins[2];
+      out.zmax = maxes[2];
+      out.z_bounds_present = true;
+    }
+
+    if (write_m) {
+      out.mmin = mins[3];
+      out.mmax = maxes[3];
+      out.m_bounds_present = true;
+    }
+
+    return out;
+  }
+
+  void Update(const EncodedGeoStatistics& encoded) {
+    if (!is_valid_) {
+      return;
+    }
+
+    // We can create GeoStatistics from a wraparound bounding box, but we can't
+    // update an existing one because the merge logic is not yet implemented.
+    if (!bounds_empty() &&
+        (is_wraparound_x() || is_wraparound(encoded.xmin, encoded.xmax))) {
+      throw ParquetException("Wraparound X is not suppored by 
GeoStatistics::Update()");
+    }
+
+    geospatial::BoundingBox box;
+
+    if (encoded.xy_bounds_present) {
+      box.min[0] = encoded.xmin;
+      box.max[0] = encoded.xmax;
+      box.min[1] = encoded.ymin;
+      box.max[1] = encoded.ymax;
+    } else {
+      box.min[0] = kNaN;
+      box.max[0] = kNaN;
+      box.min[1] = kNaN;
+      box.max[1] = kNaN;
+    }
+
+    if (encoded.z_bounds_present) {
+      box.min[2] = encoded.zmin;
+      box.max[2] = encoded.zmax;
+    } else {
+      box.min[2] = kNaN;
+      box.max[2] = kNaN;
+    }
+
+    if (encoded.m_bounds_present) {
+      box.min[3] = encoded.mmin;
+      box.max[3] = encoded.mmax;
+    } else {
+      box.min[3] = kNaN;
+      box.max[3] = kNaN;
+    }
+
+    bounder_.MergeBox(box);
+    bounder_.MergeGeometryTypes(encoded.geospatial_types);
+    has_geometry_types_ = has_geometry_types_ && 
encoded.geospatial_types_present();
+  }
+
+  bool is_wraparound_x() const {
+    return is_wraparound(lower_bound()[0], upper_bound()[0]);
+  }
+
+  bool is_valid() const { return is_valid_; }
+
+  bool bounds_empty() const {
+    for (int i = 0; i < kMaxDimensions; i++) {
+      if (!bound_empty(i)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  bool bound_empty(int i) const {
+    return std::isinf(bounder_.Bounds().min[i] - bounder_.Bounds().max[i]);
+  }
+
+  bool bound_valid(int i) const {
+    return !std::isnan(bounder_.Bounds().min[i]) && 
!std::isnan(bounder_.Bounds().max[i]);
+  }
+
+  const std::array<double, kMaxDimensions>& lower_bound() const {
+    return bounder_.Bounds().min;
+  }
+
+  const std::array<double, kMaxDimensions>& upper_bound() const {
+    return bounder_.Bounds().max;
+  }
+
+  std::optional<std::vector<int32_t>> geometry_types() const {
+    if (has_geometry_types_) {
+      return bounder_.GeometryTypes();
+    } else {
+      return std::nullopt;
+    }
+  }
+
+ private:
+  geospatial::WKBGeometryBounder bounder_;
+  bool is_valid_ = true;
+  bool has_geometry_types_ = true;
+
+  template <typename ArrayType>
+  void UpdateArrayImpl(const ::arrow::Array& values) {
+    const auto& binary_array = static_cast<const ArrayType&>(values);
+    for (int64_t i = 0; i < binary_array.length(); ++i) {
+      if (!binary_array.IsNull(i)) {
+        try {
+          bounder_.MergeGeometry(binary_array.GetView(i));
+        } catch (ParquetException&) {
+          is_valid_ = false;
+          return;
+        }
+      }
+    }
+  }
+
+  static bool is_wraparound(double dmin, double dmax) {

Review Comment:
   Is this because of the longitude case where the domain is more of a torus? 
If so, please add a comment explaining this.
   



##########
cpp/src/parquet/geospatial/statistics_test.cc:
##########
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/array/builder_binary.h"
+#include "arrow/compute/api.h"
+#include "arrow/testing/gtest_util.h"
+
+#include "parquet/geospatial/statistics.h"
+#include "parquet/test_util.h"
+
+static constexpr double kInf = std::numeric_limits<double>::infinity();
+
+namespace parquet::geospatial {
+
+TEST(TestGeoStatistics, TestDefaults) {
+  GeoStatistics stats;
+  EXPECT_TRUE(stats.geometry_types().has_value());
+  EXPECT_EQ(stats.geometry_types().value().size(), 0);
+  EXPECT_TRUE(stats.is_valid());
+  EXPECT_THAT(stats.dimension_empty(), ::testing::ElementsAre(true, true, 
true, true));
+  EXPECT_THAT(stats.dimension_valid(), ::testing::ElementsAre(true, true, 
true, true));
+  for (int i = 0; i < kMaxDimensions; i++) {
+    EXPECT_EQ(stats.lower_bound()[i], kInf);
+    EXPECT_EQ(stats.upper_bound()[i], -kInf);
+  }
+
+  EXPECT_TRUE(stats.Equals(GeoStatistics()));

Review Comment:
   We're not testing `Equals` appropriately if we're not testing cases where it 
returns false.



##########
cpp/src/parquet/arrow/arrow_schema_test.cc:
##########
@@ -1343,6 +1390,96 @@ TEST_F(TestConvertArrowSchema, 
ParquetFlatPrimitivesAsDictionaries) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsLonLat) {
+  // All the Arrow Schemas below should convert to the type defaults for 
GEOMETRY
+  // and GEOGRAPHY when GeoArrow extension types are registered and the 
appropriate
+  // writer option is set.
+  ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               LogicalType::Geometry(),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               LogicalType::Geography(),
+                                               ParquetType::BYTE_ARRAY));
+
+  // There are several ways that longitude/latitude could be specified when 
coming from
+  // GeoArrow, which allows null, missing, arbitrary strings (e.g., 
Authority:Code), and
+  // PROJJSON.
+  std::vector<std::string> geoarrow_lonlat = {
+      "null", R"("OGC:CRS84")", R"("EPSG:4326")",
+      // Purely the parts of the PROJJSON that we inspect to check the lon/lat 
case
+      R"({"id": {"authority": "OGC", "code": "CRS84"}})",
+      R"({"id": {"authority": "EPSG", "code": 4326}})"};
+
+  std::string geoarrow_lonlatish_crs = geoarrow_lonlat[0];
+  for (const auto& geoarrow_lonlatish_crs : geoarrow_lonlat) {
+    SCOPED_TRACE(geoarrow_lonlatish_crs);

Review Comment:
   ```suggestion
       ARROW_SCOPED_TRACE("crs = ", geoarrow_lonlatish_crs);
   ```



##########
cpp/src/parquet/arrow/arrow_schema_test.cc:
##########
@@ -1343,6 +1390,96 @@ TEST_F(TestConvertArrowSchema, 
ParquetFlatPrimitivesAsDictionaries) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
+TEST_F(TestConvertArrowSchema, ParquetGeoArrowCrsLonLat) {
+  // All the Arrow Schemas below should convert to the type defaults for 
GEOMETRY
+  // and GEOGRAPHY when GeoArrow extension types are registered and the 
appropriate
+  // writer option is set.
+  ::arrow::ExtensionTypeGuard guard(test::geoarrow_wkb());
+
+  std::vector<NodePtr> parquet_fields;
+  parquet_fields.push_back(PrimitiveNode::Make("geometry", 
Repetition::OPTIONAL,
+                                               LogicalType::Geometry(),
+                                               ParquetType::BYTE_ARRAY));
+  parquet_fields.push_back(PrimitiveNode::Make("geography", 
Repetition::OPTIONAL,
+                                               LogicalType::Geography(),
+                                               ParquetType::BYTE_ARRAY));
+
+  // There are several ways that longitude/latitude could be specified when 
coming from
+  // GeoArrow, which allows null, missing, arbitrary strings (e.g., 
Authority:Code), and
+  // PROJJSON.
+  std::vector<std::string> geoarrow_lonlat = {
+      "null", R"("OGC:CRS84")", R"("EPSG:4326")",
+      // Purely the parts of the PROJJSON that we inspect to check the lon/lat 
case
+      R"({"id": {"authority": "OGC", "code": "CRS84"}})",
+      R"({"id": {"authority": "EPSG", "code": 4326}})"};

Review Comment:
   Is 4326 always serialized as a number or can it also be serialized as a 
string?



##########
cpp/src/parquet/test_util.h:
##########
@@ -830,5 +832,60 @@ inline void GenerateData<FLBA>(int num_values, FLBA* out, 
std::vector<uint8_t>*
   random_fixed_byte_array(num_values, 0, heap->data(), 
kGenerateDataFLBALength, out);
 }
 
+// ----------------------------------------------------------------------
+// Test utility functions for geometry
+
+#if defined(ARROW_LITTLE_ENDIAN)
+static constexpr uint8_t kWkbNativeEndianness = 0x01;
+#else
+static constexpr uint8_t kWkbNativeEndianness = 0x00;
+#endif
+
+/// \brief Number of bytes in a WKB Point with X and Y dimensions (uint8_t 
endian,
+/// uint32_t geometry type, 2 * double ordinates)

Review Comment:
   ```suggestion
   /// \brief Number of bytes in a WKB Point with X and Y dimensions (uint8_t 
endian,
   /// uint32_t geometry type, 2 * double coordinates)
   ```



##########
cpp/src/parquet/thrift_internal.h:
##########
@@ -332,6 +408,35 @@ static inline format::SortingColumn ToThrift(SortingColumn 
sorting_column) {
   return thrift_sorting_column;
 }
 
+static inline format::GeospatialStatistics ToThrift(
+    const geospatial::EncodedGeoStatistics& encoded_geo_stats) {
+  format::GeospatialStatistics geospatial_statistics;
+
+  
geospatial_statistics.__set_geospatial_types(encoded_geo_stats.geospatial_types);
+
+  format::BoundingBox bbox;
+  if (encoded_geo_stats.xy_bounds_present) {
+    bbox.__set_xmin(encoded_geo_stats.xmin);
+    bbox.__set_xmax(encoded_geo_stats.xmax);
+    bbox.__set_ymin(encoded_geo_stats.ymin);
+    bbox.__set_ymax(encoded_geo_stats.ymax);
+
+    if (encoded_geo_stats.z_bounds_present) {
+      bbox.__set_zmin(encoded_geo_stats.zmin);
+      bbox.__set_zmax(encoded_geo_stats.zmax);
+    }
+
+    if (encoded_geo_stats.m_bounds_present) {
+      bbox.__set_mmin(encoded_geo_stats.mmin);
+      bbox.__set_mmax(encoded_geo_stats.mmax);
+    }
+
+    geospatial_statistics.__set_bbox(bbox);

Review Comment:
   ```suggestion
       geospatial_statistics.__set_bbox(std::move(bbox));
   ```



##########
cpp/src/parquet/thrift_internal.h:
##########
@@ -232,6 +233,81 @@ static inline AadMetadata FromThrift(format::AesGcmCtrV1 
aesGcmCtrV1) {
                      aesGcmCtrV1.supply_aad_prefix};
 }
 
+static inline geospatial::EncodedGeoStatistics FromThrift(
+    const format::GeospatialStatistics& geo_stats) {
+  geospatial::EncodedGeoStatistics out;
+
+  out.geospatial_types = geo_stats.geospatial_types;
+
+  if (geo_stats.__isset.bbox) {
+    out.xmin = geo_stats.bbox.xmin;
+    out.xmax = geo_stats.bbox.xmax;
+    out.ymin = geo_stats.bbox.ymin;
+    out.ymax = geo_stats.bbox.ymax;
+    out.xy_bounds_present = true;
+
+    if (geo_stats.bbox.__isset.zmin && geo_stats.bbox.__isset.zmax) {
+      out.zmin = geo_stats.bbox.zmin;
+      out.zmax = geo_stats.bbox.zmax;
+      out.z_bounds_present = true;
+    } else {
+      out.z_bounds_present = false;

Review Comment:
   I don't think the `else` branch is required since the various "present" 
fields are initialized to false?



##########
cpp/src/parquet/thrift_internal.h:
##########
@@ -332,6 +408,35 @@ static inline format::SortingColumn ToThrift(SortingColumn 
sorting_column) {
   return thrift_sorting_column;
 }
 
+static inline format::GeospatialStatistics ToThrift(
+    const geospatial::EncodedGeoStatistics& encoded_geo_stats) {
+  format::GeospatialStatistics geospatial_statistics;
+
+  
geospatial_statistics.__set_geospatial_types(encoded_geo_stats.geospatial_types);
+
+  format::BoundingBox bbox;
+  if (encoded_geo_stats.xy_bounds_present) {

Review Comment:
   Can move the `bbox` instantiation inside the `if`.
   ```suggestion
     if (encoded_geo_stats.xy_bounds_present) {
       format::BoundingBox bbox;
   ```



##########
cpp/src/parquet/types.cc:
##########
@@ -1630,6 +1669,217 @@ class LogicalType::Impl::Float16 final : public 
LogicalType::Impl::Incompatible,
 
 GENERATE_MAKE(Float16)
 
+namespace {
+std::string EscapeControl(char c) {
+  std::stringstream ss;
+  ss << R"(\u00)";
+  ss.flags(ss.hex);
+  ss.width(2);
+  ss.fill('0');
+  ss << static_cast<int>(c);
+  return ss.str();
+}
+
+void WriteCrsKeyAndValue(const std::string& crs, std::ostream& json) {
+  // There is no restriction on the crs value here, and it may contain quotes
+  // or backslashes that would result in invalid JSON if unescaped.
+  json << R"(, "crs": ")";
+  json.flags(json.hex);
+  for (char c : crs) {
+    if (c == '"') {
+      json << R"(\")";
+    } else if (c == '\\') {
+      json << R"(\\)";
+    } else if (c >= 0 && c < 32) {
+      json << EscapeControl(c);
+    } else {
+      json << c;
+    }
+  }
+  json << R"(")";
+}
+}  // namespace
+
+class LogicalType::Impl::Geometry final : public 
LogicalType::Impl::Incompatible,
+                                          public 
LogicalType::Impl::SimpleApplicable {
+ public:
+  friend class GeometryLogicalType;
+
+  std::string ToString() const override;
+  std::string ToJSON() const override;
+  format::LogicalType ToThrift() const override;
+  bool Equals(const LogicalType& other) const override;
+
+  const std::string& crs() const { return crs_; }
+
+ private:
+  explicit Geometry(std::string crs)
+      : LogicalType::Impl(LogicalType::Type::GEOMETRY, SortOrder::UNKNOWN),
+        LogicalType::Impl::SimpleApplicable(parquet::Type::BYTE_ARRAY),
+        crs_(std::move(crs)) {}
+
+  std::string crs_;
+};
+
+std::string LogicalType::Impl::Geometry::ToString() const {
+  std::stringstream type;
+  type << "Geometry(crs=" << crs_ << ")";
+  return type.str();
+}
+
+std::string LogicalType::Impl::Geometry::ToJSON() const {
+  std::stringstream json;
+  json << R"({"Type": "Geometry")";
+
+  if (!crs_.empty()) {
+    WriteCrsKeyAndValue(crs_, json);
+  }
+
+  json << "}";
+  return json.str();
+}
+
+format::LogicalType LogicalType::Impl::Geometry::ToThrift() const {
+  format::LogicalType type;
+  format::GeometryType geometry_type;
+
+  // Canonially export crs of "" as an unset CRS
+  if (!crs_.empty()) {
+    geometry_type.__set_crs(crs_);
+  }
+
+  type.__set_GEOMETRY(geometry_type);
+  return type;
+}
+
+bool LogicalType::Impl::Geometry::Equals(const LogicalType& other) const {
+  if (other.is_geometry()) {
+    const auto& other_geometry = checked_cast<const 
GeometryLogicalType&>(other);
+    return crs() == other_geometry.crs();
+  } else {
+    return false;
+  }
+}
+
+const std::string& GeometryLogicalType::crs() const {
+  return (dynamic_cast<const LogicalType::Impl::Geometry&>(*impl_)).crs();
+}
+
+std::shared_ptr<const LogicalType> GeometryLogicalType::Make(std::string crs) {
+  auto logical_type = std::shared_ptr<GeometryLogicalType>(new 
GeometryLogicalType());

Review Comment:
   `std::make_shared` is more idiomatic and also saves an allocation
   ```suggestion
     auto logical_type = std::make_shared<GeometryLogicalType>();
   ```



##########
cpp/src/parquet/types.cc:
##########
@@ -1630,6 +1669,217 @@ class LogicalType::Impl::Float16 final : public 
LogicalType::Impl::Incompatible,
 
 GENERATE_MAKE(Float16)
 
+namespace {
+std::string EscapeControl(char c) {

Review Comment:
   You're writing the JSON string into a stream anyway, so why not do that here 
as well?
   ```suggestion
   void EscapeControl(char c, std::ostream& json) {
   ```



##########
cpp/src/parquet/types.cc:
##########
@@ -479,6 +480,27 @@ std::shared_ptr<const LogicalType> LogicalType::FromThrift(
     return UUIDLogicalType::Make();
   } else if (type.__isset.FLOAT16) {
     return Float16LogicalType::Make();
+  } else if (type.__isset.GEOMETRY) {
+    std::string crs;
+    if (type.GEOMETRY.__isset.crs) {
+      crs = type.GEOMETRY.crs;
+    }
+
+    return GeometryLogicalType::Make(crs);

Review Comment:
   ```suggestion
       return GeometryLogicalType::Make(std::move(crs));
   ```



##########
python/pyarrow/_parquet.pyx:
##########
@@ -319,6 +319,136 @@ cdef _box_flba(ParquetFLBA val, uint32_t len):
     return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
 
 
+cdef class GeoStatistics(_Weakrefable):
+    """Statistics for columns with geospatial data types (experimental)"""
+
+    def __init__(self):
+        raise TypeError(f"Do not call {self.__class__.__name__}'s constructor 
directly")
+
+    def __cinit__(self):
+        pass
+
+    def __repr__(self):
+        return f"""{object.__repr__(self)}
+  geospatial_types: {self.geospatial_types}
+  xmin: {self.xmin}, xmax: {self.xmax}
+  ymin: {self.ymin}, ymax: {self.ymax}
+  zmin: {self.zmin}, zmax: {self.zmax}
+  mmin: {self.mmin}, mmax: {self.mmax}"""
+
+    def to_dict(self):
+        out = {
+            "geospatial_types": self.geospatial_types,
+            "xmin": self.xmin,
+            "xmax": self.xmax,
+            "ymin": self.ymin,
+            "ymax": self.ymax,
+            "zmin": self.zmin,
+            "zmax": self.zmax,
+            "mmin": self.mmin,
+            "mmax": self.mmax
+        }
+
+        return out
+
+    @property
+    def geospatial_types(self):
+        cdef optional[vector[int32_t]] maybe_geometry_types = \
+            self.statistics.get().geometry_types()
+        if not maybe_geometry_types.has_value():
+            return None
+
+        return list(maybe_geometry_types.value())
+
+    @property
+    def lower_bound(self):
+        return [self.statistics.get().lower_bound()[i] for i in range(4)]
+
+    @property
+    def upper_bound(self):
+        return [self.statistics.get().upper_bound()[i] for i in range(4)]
+
+    @property
+    def dimension_empty(self):
+        return [self.statistics.get().dimension_empty()[i] for i in range(4)]
+
+    @property
+    def dimension_valid(self):
+        return [self.statistics.get().dimension_valid()[i] for i in range(4)]
+
+    @property
+    def has_x(self):
+        return self.dimension_valid[0] and not self.dimension_empty[0]
+
+    @property
+    def has_y(self):
+        return self.dimension_valid[1] and not self.dimension_empty[1]
+
+    @property
+    def has_z(self):
+        return self.dimension_valid[2] and not self.dimension_empty[2]
+
+    @property
+    def has_m(self):
+        return self.dimension_valid[3] and not self.dimension_empty[3]
+
+    @property
+    def xmin(self):
+        if self.has_x:
+            return self.lower_bound[0]
+        else:
+            return None

Review Comment:
   For the record, you can make these snippets shorter if you want:
   ```suggestion
           return self.lower_bound[0] if self.has_x else None
   ```



##########
python/pyarrow/_parquet.pyx:
##########
@@ -319,6 +319,136 @@ cdef _box_flba(ParquetFLBA val, uint32_t len):
     return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
 
 
+cdef class GeoStatistics(_Weakrefable):
+    """Statistics for columns with geospatial data types (experimental)"""
+
+    def __init__(self):
+        raise TypeError(f"Do not call {self.__class__.__name__}'s constructor 
directly")
+
+    def __cinit__(self):
+        pass
+
+    def __repr__(self):
+        return f"""{object.__repr__(self)}
+  geospatial_types: {self.geospatial_types}
+  xmin: {self.xmin}, xmax: {self.xmax}
+  ymin: {self.ymin}, ymax: {self.ymax}
+  zmin: {self.zmin}, zmax: {self.zmax}
+  mmin: {self.mmin}, mmax: {self.mmax}"""
+
+    def to_dict(self):
+        out = {
+            "geospatial_types": self.geospatial_types,
+            "xmin": self.xmin,
+            "xmax": self.xmax,
+            "ymin": self.ymin,
+            "ymax": self.ymax,
+            "zmin": self.zmin,
+            "zmax": self.zmax,
+            "mmin": self.mmin,
+            "mmax": self.mmax
+        }
+
+        return out
+
+    @property
+    def geospatial_types(self):
+        cdef optional[vector[int32_t]] maybe_geometry_types = \
+            self.statistics.get().geometry_types()
+        if not maybe_geometry_types.has_value():
+            return None
+
+        return list(maybe_geometry_types.value())
+
+    @property
+    def lower_bound(self):
+        return [self.statistics.get().lower_bound()[i] for i in range(4)]
+
+    @property
+    def upper_bound(self):
+        return [self.statistics.get().upper_bound()[i] for i in range(4)]
+
+    @property
+    def dimension_empty(self):
+        return [self.statistics.get().dimension_empty()[i] for i in range(4)]
+
+    @property
+    def dimension_valid(self):
+        return [self.statistics.get().dimension_valid()[i] for i in range(4)]
+
+    @property
+    def has_x(self):
+        return self.dimension_valid[0] and not self.dimension_empty[0]
+
+    @property
+    def has_y(self):
+        return self.dimension_valid[1] and not self.dimension_empty[1]
+
+    @property
+    def has_z(self):
+        return self.dimension_valid[2] and not self.dimension_empty[2]
+
+    @property
+    def has_m(self):
+        return self.dimension_valid[3] and not self.dimension_empty[3]
+
+    @property
+    def xmin(self):
+        if self.has_x:
+            return self.lower_bound[0]
+        else:
+            return None

Review Comment:
   And by the way, this will call the `lower_bound` property every time that 
`xmin`, `ymin`, etc. is called. Not very efficient, so perhaps you want 
something like:
   ```suggestion
           return self.statistics.get().lower_bound()[0] if self.has_x else None
   ```



##########
cpp/src/parquet/types.h:
##########
@@ -446,6 +467,30 @@ class PARQUET_EXPORT Float16LogicalType : public 
LogicalType {
   Float16LogicalType() = default;
 };
 
+class PARQUET_EXPORT GeometryLogicalType : public LogicalType {
+ public:
+  static std::shared_ptr<const LogicalType> Make(std::string crs = "");
+
+  const std::string& crs() const;
+
+ private:
+  GeometryLogicalType() = default;
+};
+
+class PARQUET_EXPORT GeographyLogicalType : public LogicalType {
+ public:
+  static std::shared_ptr<const LogicalType> Make(
+      std::string crs = "", LogicalType::EdgeInterpolationAlgorithm::algorithm 
algorithm =
+                                EdgeInterpolationAlgorithm::SPHERICAL);
+
+  const std::string& crs() const;
+  LogicalType::EdgeInterpolationAlgorithm::algorithm algorithm() const;
+  const char* algorithm_name() const;
+
+ private:
+  GeographyLogicalType() = default;

Review Comment:
   Hmm, this may be meant to prevent direct instantiation. Let's keep it then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to