This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bfe1c4165ea5 [SPARK-55261][GEO][SQL] Implement Parquet read support 
for Geo types
bfe1c4165ea5 is described below

commit bfe1c4165ea570963154f87aaa8006405287c05f
Author: Uros Bojanic <[email protected]>
AuthorDate: Tue Feb 17 01:09:11 2026 +0800

    [SPARK-55261][GEO][SQL] Implement Parquet read support for Geo types
    
    ### What changes were proposed in this pull request?
    Enable reading Geometry and Geography data from Parquet files.
    
    ### Why are the changes needed?
    Allowing users to read persisted geospatial data from Parquet format.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, geo data can now be read from Parquet.
    
    ### How was this patch tested?
    Added tests for reading GEOMETRY and GEOGRAPHY from Parquet.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #54040 from uros-db/geo-parquet-read.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../parquet/ParquetVectorUpdaterFactory.java       |  72 ++++++++++++
 .../parquet/VectorizedColumnReader.java            |   4 +
 .../parquet/VectorizedDeltaByteArrayReader.java    |  44 ++++++++
 .../VectorizedDeltaLengthByteArrayReader.java      |  35 ++++++
 .../parquet/VectorizedPlainValuesReader.java       |  53 +++++++++
 .../datasources/parquet/VectorizedReaderBase.java  |  10 ++
 .../parquet/VectorizedRleValuesReader.java         |  10 ++
 .../parquet/VectorizedValuesReader.java            |   2 +
 .../datasources/parquet/WKBConverterStrategy.java  |  51 +++++++++
 .../execution/vectorized/WritableColumnVector.java |   1 +
 .../datasources/parquet/ParquetRowConverter.scala  |  82 +++++++++++++-
 .../parquet/ParquetCompatibilityTest.scala         |  81 ++++++++++++++
 .../ParquetDeltaByteArrayEncodingSuite.scala       |  58 +++++++++-
 .../ParquetDeltaLengthByteArrayEncodingSuite.scala |  62 ++++++++++-
 .../datasources/parquet/ParquetGeoSuite.scala      | 122 +++++++++++++++++++++
 15 files changed, 683 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 4f90f878da86..0e2c997e553f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -209,6 +209,10 @@ public class ParquetVectorUpdaterFactory {
         if (sparkType instanceof  StringType || sparkType == 
DataTypes.BinaryType ||
           canReadAsBinaryDecimal(descriptor, sparkType)) {
           return new BinaryUpdater();
+        } else if (sparkType instanceof GeometryType) {
+          return new GeometryUpdater();
+        } else if (sparkType instanceof GeographyType) {
+          return new GeographyUpdater();
         } else if (canReadAsDecimal(descriptor, sparkType)) {
           return new BinaryToDecimalUpdater(descriptor, (DecimalType) 
sparkType);
         }
@@ -1044,6 +1048,74 @@ public class ParquetVectorUpdaterFactory {
     }
   }
 
+  private static final class GeometryUpdater implements ParquetVectorUpdater {
+
+    @Override
+    public void decodeSingleDictionaryId(
+        int offset,
+        WritableColumnVector values,
+        WritableColumnVector dictionaryIds,
+        Dictionary dictionary) {
+      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(offset));
+      int srid = ((GeometryType) values.dataType()).srid();
+      values.putByteArray(offset,
+        WKBToGeometryConverter.INSTANCE.convert(v.getBytesUnsafe(), srid));
+    }
+
+    @Override
+    public void readValue(
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      this.readValues(1, offset, values, valuesReader);
+    }
+
+    @Override
+    public void readValues(int total, int offset, WritableColumnVector values,
+       VectorizedValuesReader valuesReader) {
+      valuesReader.readGeometry(total, values, offset);
+    }
+
+    @Override
+    public void skipValues(int total, VectorizedValuesReader valuesReader) {
+      valuesReader.skipBinary(total);
+    }
+  }
+
+  private static final class GeographyUpdater implements ParquetVectorUpdater {
+
+    @Override
+    public void decodeSingleDictionaryId(
+        int offset,
+        WritableColumnVector values,
+        WritableColumnVector dictionaryIds,
+        Dictionary dictionary) {
+      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(offset));
+      int srid = ((GeographyType) values.dataType()).srid();
+      values.putByteArray(offset,
+        WKBToGeographyConverter.INSTANCE.convert(v.getBytesUnsafe(), srid));
+    }
+
+    @Override
+    public void readValue(
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      this.readValues(1, offset, values, valuesReader);
+    }
+
+    @Override
+    public void readValues(int total, int offset, WritableColumnVector values,
+       VectorizedValuesReader valuesReader) {
+      valuesReader.readGeography(total, values, offset);
+    }
+
+    @Override
+    public void skipValues(int total, VectorizedValuesReader valuesReader) {
+      valuesReader.skipBinary(total);
+    }
+  }
+
   private static class IntegerToBinaryUpdater implements ParquetVectorUpdater {
 
     @Override
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6e1660dc8c87..971edfec3b11 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -43,6 +43,8 @@ import 
org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.GeometryType;
+import org.apache.spark.sql.types.GeographyType;
 
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
@@ -180,6 +182,8 @@ public class VectorizedColumnReader {
         break;
       case BINARY:
         isSupported = !needsDecimalScaleRebase(sparkType);
+        boolean isGeoType = sparkType instanceof GeometryType || sparkType 
instanceof GeographyType;
+        isSupported = isSupported && !isGeoType;
         break;
     }
     return isSupported;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
index 198d57267fc3..1edee60bc564 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
@@ -25,6 +25,8 @@ import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -100,6 +102,48 @@ public class VectorizedDeltaByteArrayReader extends 
VectorizedReaderBase
     readValues(total, c, rowId);
   }
 
+  @Override
+  public void readGeometry(int total, WritableColumnVector c, int rowId) {
+    assert(c.dataType() instanceof GeometryType);
+    int srid = ((GeometryType) c.dataType()).srid();
+    readGeoData(total, c, rowId, srid, WKBToGeometryConverter.INSTANCE);
+  }
+
+  @Override
+  public void readGeography(int total, WritableColumnVector c, int rowId) {
+    assert(c.dataType() instanceof GeographyType);
+    int srid = ((GeographyType) c.dataType()).srid();
+    readGeoData(total, c, rowId, srid, WKBToGeographyConverter.INSTANCE);
+  }
+
+  private void readGeoData(int total, WritableColumnVector c, int rowId, int 
srid,
+     WKBConverterStrategy converter) {
+    for (int i = 0; i < total; i++) {
+      int prefixLength = prefixLengthVector.getInt(currentRow);
+      ByteBuffer suffix = suffixReader.getBytes(currentRow);
+      int suffixLength = suffix.limit() - suffix.position();
+      int length = prefixLength + suffixLength;
+
+      byte[] wkb = new byte[length];
+      if (prefixLength > 0) {
+        previous.get(wkb, 0, prefixLength);
+      }
+      suffix.get(wkb, prefixLength, suffixLength);
+
+      // Converts WKB into a physical representation of geometry/geography.
+      byte[] physicalValue = converter.convert(wkb, srid);
+
+      WritableColumnVector arrayData = c.arrayData();
+      int offset = arrayData.getElementsAppended();
+      arrayData.appendBytes(physicalValue.length, physicalValue, 0);
+
+      c.putArray(rowId + i, offset, physicalValue.length);
+      previous = ByteBuffer.wrap(wkb);
+
+      currentRow++;
+    }
+  }
+
   /**
    * There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset() 
method did not clear the
    * previous value state that it tracks internally. This resulted in the 
first value of all pages
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
index 9be867d61900..1cfaa59f18ea 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
@@ -25,6 +25,8 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
 
 /**
  * An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that 
supports the vectorized
@@ -67,6 +69,39 @@ public class VectorizedDeltaLengthByteArrayReader extends 
VectorizedReaderBase i
     currentRow += total;
   }
 
+  @Override
+  public void readGeometry(int total, WritableColumnVector c, int rowId) {
+    assert(c.dataType() instanceof GeometryType);
+    int srid = ((GeometryType) c.dataType()).srid();
+    readGeoData(total, c, rowId, srid, WKBToGeometryConverter.INSTANCE);
+  }
+
+  @Override
+  public void readGeography(int total, WritableColumnVector c, int rowId) {
+    assert(c.dataType() instanceof GeographyType);
+    int srid = ((GeographyType) c.dataType()).srid();
+    readGeoData(total, c, rowId, srid, WKBToGeographyConverter.INSTANCE);
+  }
+
+  private void readGeoData(int total, WritableColumnVector c, int rowId, int 
srid,
+     WKBConverterStrategy converter) {
+    ByteBufferOutputWriter outputWriter = 
ByteBufferOutputWriter::writeArrayByteBuffer;
+    int length;
+    for (int i = 0; i < total; i++) {
+      length = lengthsVector.getInt(currentRow + i);
+      byte[] physicalValue;
+      try {
+        // Converts WKB into a physical representation of geometry/geography.
+        physicalValue = converter.convert(in.readNBytes(length), srid);
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Failed to read " + length + " 
bytes");
+      }
+
+      outputWriter.write(c, rowId + i, ByteBuffer.wrap(physicalValue), 
physicalValue.length);
+    }
+    currentRow += total;
+  }
+
   public ByteBuffer getBytes(int rowId) {
     int length = lengthsVector.getInt(rowId);
     try {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 14ba534c143c..7364fa5536c0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -30,6 +30,9 @@ import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
+import org.apache.spark.util.ByteBufferOutputStream;
 
 /**
  * An implementation of the Parquet PLAIN decoder that supports the vectorized 
interface.
@@ -406,4 +409,54 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   public void skipFixedLenByteArray(int total, int len) {
     in.skip(total * (long) len);
   }
+
+  @Override
+  public void readGeometry(int total, WritableColumnVector v, int rowId) {
+    assert(v.dataType() instanceof GeometryType);
+    int srid = ((GeometryType) v.dataType()).srid();
+    try {
+      readGeoData(total, v, rowId, srid, WKBToGeometryConverter.INSTANCE);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read geometry", e);
+    }
+  }
+
+  @Override
+  public void readGeography(int total, WritableColumnVector v, int rowId) {
+    assert(v.dataType() instanceof GeographyType);
+    int srid = ((GeographyType) v.dataType()).srid();
+    try {
+      readGeoData(total, v, rowId, srid, WKBToGeographyConverter.INSTANCE);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read geography", e);
+    }
+  }
+
+  private void readGeoData(int total, WritableColumnVector v, int rowId, int 
srid,
+     WKBConverterStrategy converter) throws IOException {
+    // Go through the input stream and convert the WKB to the internal 
representation
+    // writing it to the output buffer and putting the (offset, length) in the 
vector.
+    // Finally, append all data from the output buffer in a single operation.
+    int base = v.arrayData().getElementsAppended();
+    int dataLen = 0;
+    final int intSize = 4;
+    ByteBuffer lenBuffer = ByteBuffer.allocate(intSize);
+    ByteBufferOutputStream out = new ByteBufferOutputStream();
+
+    for (int i = 0; i < total; i++) {
+      int len = readInteger();
+
+      // Converts WKB into a physical representation of geometry/geography.
+      byte[] physicalValue = converter.convert(in.readNBytes(len), srid);
+      v.putArray(rowId + i, base + dataLen + intSize, physicalValue.length);
+
+      lenBuffer.putInt(0, physicalValue.length);
+      out.write(lenBuffer.array());
+      out.write(physicalValue);
+
+      dataLen += intSize + physicalValue.length;
+    }
+    out.close();
+    v.arrayData().appendBytes(dataLen, out.toByteArray(), 0);
+  }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
index ab8fd9bdb6ff..77bb9340e526 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
@@ -109,6 +109,16 @@ public class VectorizedReaderBase extends ValuesReader 
implements VectorizedValu
     throw SparkUnsupportedOperationException.apply();
   }
 
+  @Override
+  public void readGeometry(int total, WritableColumnVector c, int rowId) {
+    throw SparkUnsupportedOperationException.apply();
+  }
+
+  @Override
+  public void readGeography(int total, WritableColumnVector c, int rowId) {
+    throw SparkUnsupportedOperationException.apply();
+  }
+
   @Override
   public void skipBooleans(int total) {
     throw SparkUnsupportedOperationException.apply();
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 60544665409d..efb41e1753c1 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -764,6 +764,16 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
+  @Override
+  public void readGeometry(int total, WritableColumnVector c, int rowId) {
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
+  }
+
+  @Override
+  public void readGeography(int total, WritableColumnVector c, int rowId) {
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
+  }
+
   @Override
   public void readBooleans(int total, WritableColumnVector c, int rowId) {
     int left = total;
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 430861433849..d29ce0dd12e4 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -57,6 +57,8 @@ public interface VectorizedValuesReader {
   void readFloats(int total, WritableColumnVector c, int rowId);
   void readDoubles(int total, WritableColumnVector c, int rowId);
   void readBinary(int total, WritableColumnVector c, int rowId);
+  void readGeometry(int total, WritableColumnVector c, int rowId);
+  void readGeography(int total, WritableColumnVector c, int rowId);
 
    /*
     * Skips `total` values
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
new file mode 100644
index 000000000000..a8be90951289
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.catalyst.util.STUtils;
+
+/**
+ * Interface for converting a WKB byte array into a physical geometry or 
geography value.
+ */
+interface WKBConverterStrategy {
+  byte[] convert(byte[] wkb, int srid);
+}
+
+/**
+ * Converts the provided WKB data into a geometry object.
+ */
+enum WKBToGeometryConverter implements WKBConverterStrategy {
+  INSTANCE;
+
+  @Override
+  public byte[] convert(byte[] wkb, int srid) {
+    return STUtils.stGeomFromWKB(wkb, srid).getBytes();
+  }
+}
+
+/**
+ * Converts the provided WKB data into a geography object.
+ */
+enum WKBToGeographyConverter implements WKBConverterStrategy {
+  INSTANCE;
+
+  @Override
+  public byte[] convert(byte[] wkb, int srid) {
+    return STUtils.stGeogFromWKB(wkb).getBytes();
+  }
+}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index c4f06e07911d..904c48309778 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -955,6 +955,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
 
   protected boolean isArray() {
     return type instanceof ArrayType || type instanceof BinaryType || type 
instanceof StringType ||
+      type instanceof GeometryType || type instanceof GeographyType ||
       DecimalType.isByteArrayDecimalType(type);
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 271a1485dfd3..f253cbd0d0d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{PhysicalByteType, 
PhysicalShortType}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns, 
STUtils}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -43,7 +43,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
VariantMetadata}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String, 
VariantVal}
 import org.apache.spark.util.collection.Utils
 
 /**
@@ -412,6 +412,12 @@ private[parquet] class ParquetRowConverter(
       case _: StringType =>
         new ParquetStringConverter(updater)
 
+      case geom: GeometryType =>
+        new ParquetGeometryConverter(geom.srid, updater)
+
+      case _: GeographyType =>
+        new ParquetGeographyConverter(updater)
+
       // As long as the parquet type is INT64 timestamp, whether logical 
annotation
       // `isAdjustedToUTC` is false or true, it will be read as Spark's 
TimestampLTZ type
       case TimestampType
@@ -612,6 +618,78 @@ private[parquet] class ParquetRowConverter(
     }
   }
 
+  /**
+   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   */
+  private final class ParquetGeometryConverter(srid: Int, updater: 
ParentContainerUpdater)
+      extends ParquetPrimitiveConverter(updater) {
+
+    private var expandedDictionary: Array[GeometryVal] = null
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+        STUtils.stGeomFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe, 
srid)
+      }
+    }
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    override def addBinary(value: Binary): Unit = {
+      val buffer = value.toByteBuffer
+      val numBytes = buffer.remaining()
+
+      val geometry = if (buffer.hasArray) {
+        val array = buffer.array()
+        val offset = buffer.arrayOffset() + buffer.position()
+        STUtils.stGeomFromWKB(array.slice(offset, offset + numBytes), srid)
+      } else {
+        STUtils.stGeomFromWKB(value.getBytesUnsafe, srid)
+      }
+
+      updater.set(geometry)
+    }
+  }
+
+  /**
+   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   */
+  private final class ParquetGeographyConverter(updater: 
ParentContainerUpdater)
+      extends ParquetPrimitiveConverter(updater) {
+
+    private var expandedDictionary: Array[GeographyVal] = null
+
+    override def hasDictionarySupport: Boolean = true
+
+    override def setDictionary(dictionary: Dictionary): Unit = {
+      this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+        STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe)
+      }
+    }
+
+    override def addValueFromDictionary(dictionaryId: Int): Unit = {
+      updater.set(expandedDictionary(dictionaryId))
+    }
+
+    override def addBinary(value: Binary): Unit = {
+      val buffer = value.toByteBuffer
+      val numBytes = buffer.remaining()
+
+      val geometry = if (buffer.hasArray) {
+        val array = buffer.array()
+        val offset = buffer.arrayOffset() + buffer.position()
+        STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes))
+      } else {
+        STUtils.stGeogFromWKB(value.getBytesUnsafe)
+      }
+
+      updater.set(geometry)
+    }
+  }
+
   /**
    * Parquet converter for fixed-precision decimals.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index cd31cd963db7..414be5c608d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -17,17 +17,22 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.parquet.column.values.ValuesWriter
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetWriter}
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.Binary
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType}
 
 /**
  * Helper class for testing Parquet compatibility.
@@ -56,6 +61,82 @@ private[sql] abstract class ParquetCompatibilityTest extends 
QueryTest with Parq
          |${readParquetSchema(path)}
        """.stripMargin)
   }
+
+  protected def writeBinaryData(writer: ValuesWriter, binaryValues: 
Array[Array[Byte]]): Unit = {
+    for (binaryValue <- binaryValues) {
+      writer.writeBytes(Binary.fromReusedByteArray(binaryValue))
+    }
+  }
+
+  /** Construct WKB for POINT(x, y) in little-endian format. */
+  protected def makePointWkb(x: Double, y: Double): Array[Byte] = {
+    val buf = ByteBuffer.allocate(21).order(ByteOrder.LITTLE_ENDIAN)
+    buf.put(1.toByte) // little-endian byte order
+    buf.putInt(1) // WKB type: Point
+    buf.putDouble(x)
+    buf.putDouble(y)
+    buf.array()
+  }
+
+  /** Construct WKB for LINESTRING in little-endian format from (x, y) pairs. 
*/
+  protected def makeLineStringWkb(coords: (Double, Double)*): Array[Byte] = {
+    val numPoints = coords.length
+    val buf = ByteBuffer.allocate(9 + 16 * 
numPoints).order(ByteOrder.LITTLE_ENDIAN)
+    buf.put(1.toByte) // little-endian byte order
+    buf.putInt(2) // WKB type: LineString
+    buf.putInt(numPoints)
+    coords.foreach { case (x, y) =>
+      buf.putDouble(x)
+      buf.putDouble(y)
+    }
+    buf.array()
+  }
+
+  /** Construct WKB for POLYGON in little-endian format from a single ring of 
(x, y) pairs.
+   *  An empty argument list produces an empty polygon (0 rings). */
+  protected def makePolygonWkb(ring: (Double, Double)*): Array[Byte] = {
+    if (ring.isEmpty) {
+      val buf = ByteBuffer.allocate(9).order(ByteOrder.LITTLE_ENDIAN)
+      buf.put(1.toByte) // little-endian byte order
+      buf.putInt(3) // WKB type: Polygon
+      buf.putInt(0) // 0 rings = empty
+      buf.array()
+    } else {
+      val numPoints = ring.length
+      val buf = ByteBuffer.allocate(13 + 16 * 
numPoints).order(ByteOrder.LITTLE_ENDIAN)
+      buf.put(1.toByte) // little-endian byte order
+      buf.putInt(3) // WKB type: Polygon
+      buf.putInt(1) // number of rings
+      buf.putInt(numPoints)
+      ring.foreach { case (x, y) =>
+        buf.putDouble(x)
+        buf.putDouble(y)
+      }
+      buf.array()
+    }
+  }
+
+  /**
+   * Construct WKB for a multi/collection geometry in little-endian format.
+   * WKB types: MultiPoint=4, MultiLineString=5, MultiPolygon=6, 
GeometryCollection=7.
+   */
+  protected def makeMultiWkb(wkbType: Int, geometries: Array[Byte]*): 
Array[Byte] = {
+    val totalSize = 9 + geometries.map(_.length).sum
+    val buf = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN)
+    buf.put(1.toByte) // little-endian byte order
+    buf.putInt(wkbType)
+    buf.putInt(geometries.length)
+    geometries.foreach(g => buf.put(g))
+    buf.array()
+  }
+
+  protected def testGeo(testName: String)(testFun: DataType => Unit): Unit = {
+    Seq(GeometryType(0), GeometryType(4326), GeographyType(4326)).foreach { 
geoType =>
+      test(s"$testName $geoType") {
+        testFun(geoType)
+      }
+    }
+  }
 }
 
 private[sql] object ParquetCompatibilityTest {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
index c54eef348f34..9b34e5011b5d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
@@ -20,9 +20,10 @@ import org.apache.parquet.bytes.DirectByteBufferAllocator
 import org.apache.parquet.column.values.Utils
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter
 
+import org.apache.spark.sql.catalyst.util.STUtils
 import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, 
WritableColumnVector}
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType, 
IntegerType, StringType}
 
 /**
  * Read tests for vectorized Delta byte array  reader.
@@ -81,6 +82,61 @@ class ParquetDeltaByteArrayEncodingSuite extends 
ParquetCompatibilityTest with S
     assert(7 == writableColumnVector.getInt(2))
   }
 
+  testGeo("geo types single point") { geoType =>
+    assertGeoReadWrite(writer, reader, Array(makePointWkb(1, 1)), geoType)
+  }
+
+  testGeo("geo types multiple identical points") { geoType =>
+    assertGeoReadWrite(writer, reader,
+      Array(makePointWkb(1, 1), makePointWkb(1, 1), makePointWkb(1, 1)), 
geoType)
+  }
+
+  testGeo("geo types polygons with shared prefix") { geoType =>
+    // These polygons share a WKB prefix, exercising delta encoding.
+    assertGeoReadWrite(writer, reader, Array(
+      makePolygonWkb((3, 3), (4, 4), (5, 5.1), (3, 3)),
+      makePolygonWkb((3, 3), (4, 4), (5, 5.2), (3, 3)),
+      makePolygonWkb((3, 3), (4, 4), (5, 5.3), (3, 3))),
+      geoType)
+  }
+
+  private def assertGeoReadWrite(
+      writer: DeltaByteArrayWriter,
+      reader: VectorizedDeltaByteArrayReader,
+      wkbValues: Array[Array[Byte]],
+      dataType: DataType): Unit = {
+
+    val (isGeometry, srid) = dataType match {
+      case geom: GeometryType => (true, geom.srid)
+      case geog: GeographyType => (false, geog.srid)
+    }
+
+    val length = wkbValues.length
+
+    writeBinaryData(writer, wkbValues)
+    writableColumnVector = new OnHeapColumnVector(length, dataType)
+
+    reader.initFromPage(length, writer.getBytes.toInputStream)
+    if (isGeometry) {
+      reader.readGeometry(length, writableColumnVector, 0)
+    } else {
+      reader.readGeography(length, writableColumnVector, 0)
+    }
+
+    for (i <- 0 until length) {
+      val actualWkb = if (isGeometry) {
+        val geom = writableColumnVector.getGeometry(i)
+        assert(srid === STUtils.stSrid(geom))
+        STUtils.stAsBinary(geom)
+      } else {
+        val geog = writableColumnVector.getGeography(i)
+        assert(srid === STUtils.stSrid(geog))
+        STUtils.stAsBinary(geog)
+      }
+      assert(wkbValues(i) sameElements actualWkb)
+    }
+  }
+
   private def assertReadWrite(
       writer: DeltaByteArrayWriter,
       reader: VectorizedDeltaByteArrayReader,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
index a1c01632dd3c..b9009f59f69a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
@@ -24,9 +24,10 @@ import org.apache.parquet.column.values.Utils
 import 
org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter
 import org.apache.parquet.io.api.Binary
 
+import org.apache.spark.sql.catalyst.util.STUtils
 import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, 
WritableColumnVector}
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType, 
IntegerType, StringType}
 
 /**
  * Read tests for vectorized Delta length byte array  reader.
@@ -104,6 +105,65 @@ class ParquetDeltaLengthByteArrayEncodingSuite
     }
   }
 
+  testGeo("geo types single point") { geoType =>
+    assertGeoReadWrite(writer, reader, Array(makePointWkb(1, 1)), geoType)
+  }
+
+  testGeo("geo types with multiple geometries") { geoType =>
+    // Different WKB sizes exercise variable-length encoding.
+    assertGeoReadWrite(writer, reader, Array(
+      makePointWkb(0, 0),
+      makeLineStringWkb((1, 1), (2, 1)),
+      makePointWkb(3, 4)),
+      geoType)
+  }
+
+  testGeo("geo types with mixed length polygons") { geoType =>
+    // Polygons with increasing vertex count.
+    assertGeoReadWrite(writer, reader, Array(
+      makePolygonWkb((1, 2), (3, 4), (5, 6), (1, 2)),
+      makePolygonWkb((1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (1, 2)),
+      makePolygonWkb((1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12), (13, 
14), (1, 2))),
+      geoType)
+  }
+
+  private def assertGeoReadWrite(
+      writer: DeltaLengthByteArrayValuesWriter,
+      reader: VectorizedDeltaLengthByteArrayReader,
+      wkbValues: Array[Array[Byte]],
+      dataType: DataType): Unit = {
+
+    val (isGeometry, srid) = dataType match {
+      case geom: GeometryType => (true, geom.srid)
+      case geog: GeographyType => (false, geog.srid)
+    }
+
+    val length = wkbValues.length
+
+    writeBinaryData(writer, wkbValues)
+    writableColumnVector = new OnHeapColumnVector(length, dataType)
+
+    reader.initFromPage(length, writer.getBytes.toInputStream)
+    if (isGeometry) {
+      reader.readGeometry(length, writableColumnVector, 0)
+    } else {
+      reader.readGeography(length, writableColumnVector, 0)
+    }
+
+    for (i <- 0 until length) {
+      val actualWkb = if (isGeometry) {
+        val geom = writableColumnVector.getGeometry(i)
+        assert(srid === STUtils.stSrid(geom))
+        STUtils.stAsBinary(geom)
+      } else {
+        val geog = writableColumnVector.getGeography(i)
+        assert(srid === STUtils.stSrid(geog))
+        STUtils.stAsBinary(geog)
+      }
+      assert(wkbValues(i) sameElements actualWkb)
+    }
+  }
+
   private def writeData(writer: DeltaLengthByteArrayValuesWriter, values: 
Array[String]): Unit = {
     for (i <- values.indices) {
       writer.writeBytes(Binary.fromString(values(i)))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
new file mode 100644
index 000000000000..107b5b7675b1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.parquet.hadoop.ParquetOutputFormat
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb, 
st_geomfromwkb}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ParquetGeoSuite
+    extends ParquetCompatibilityTest
+    with SharedSparkSession {
+
+  import testImplicits._
+
+  /**
+   * Writes the given WKB byte arrays to Parquet files in geometry and 
geography columns.
+   */
+  private def writeParquetFiles(dir: File, wkbValues: Seq[Array[Byte]]): Unit 
= {
+    val df = wkbValues
+      .toDF("wkb")
+      .select(st_geomfromwkb($"wkb").as("geom"), 
st_geogfromwkb($"wkb").as("geog"))
+    df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+  }
+
+  /**
+   * Reads all parquet files from the given directory and returns a DataFrame 
with WKB
+   * representation of `geom` and `geog` columns.
+   */
+  private def readParquetFiles(dir: File): DataFrame = {
+    spark.read
+      .parquet(dir.getAbsolutePath)
+      .select(st_asbinary($"geom"), st_asbinary($"geog"))
+  }
+
+  /**
+   * Writes the given WKB byte arrays to Parquet files in both geometry and 
geography columns.
+   * Reads them back checking that the read values match the original WKB 
values.
+   */
+  private def testReadWrite(wkbValues: Seq[Array[Byte]]): Unit = {
+    withTempDir { dir =>
+      withAllParquetWriters {
+        writeParquetFiles(dir, wkbValues)
+
+        withAllParquetReaders {
+          checkAnswer(readParquetFiles(dir), wkbValues.map(wkb => Row(wkb, 
wkb)))
+        }
+      }
+    }
+  }
+
+  /** Common WKB test values used across multiple tests. */
+  private val point0Wkb = makePointWkb(0, 0) // POINT(0 0)
+  private val point1Wkb = makePointWkb(1, 1) // POINT(1 1)
+  private val line0Wkb = makeLineStringWkb((0, 0), (1, 1)) // LINESTRING(0 0,1 
1)
+  private val line1Wkb = makeLineStringWkb((1, 1), (2, 2)) // LINESTRING(1 1,2 
2)
+  private val line2Wkb = makeLineStringWkb((2, 2), (3, 3)) // LINESTRING(2 2,3 
3)
+  // POLYGON((0 0,1 1,2 2,0 0))
+  private val polygon0Wkb = makePolygonWkb((0, 0), (1, 1), (2, 2), (0, 0))
+  // POLYGON((3 3,4 4,5 5,3 3))
+  private val polygon3Wkb = makePolygonWkb((3, 3), (4, 4), (5, 5), (3, 3))
+  // MULTIPOINT((0 0),(1 1))
+  private val multiPointWkb = makeMultiWkb(4, point0Wkb, point1Wkb)
+  // MULTILINESTRING((0 0,1 1),(2 2,3 3))
+  private val multiLineStringWkb = makeMultiWkb(5, line0Wkb, line2Wkb)
+  // MULTIPOLYGON(((0 0,1 1,2 2,0 0)),((3 3,4 4,5 5,3 3)))
+  private val multiPolygonWkb = makeMultiWkb(6, polygon0Wkb, polygon3Wkb)
+  // GEOMETRYCOLLECTION(POINT(0 0),LINESTRING(1 1,2 2))
+  private val geometryCollectionWkb = makeMultiWkb(7, point0Wkb, line1Wkb)
+
+  test("basic read and write") {
+    testReadWrite(Seq(point0Wkb))
+    testReadWrite(Seq(line0Wkb))
+    testReadWrite(Seq(polygon0Wkb))
+    testReadWrite(Seq(multiPointWkb))
+    testReadWrite(Seq(multiLineStringWkb))
+    testReadWrite(Seq(multiPolygonWkb))
+    testReadWrite(Seq(geometryCollectionWkb))
+    // Test writing multiple values at once.
+    testReadWrite(Seq(point1Wkb, line1Wkb, makePolygonWkb()))
+  }
+
+  test("dictionary encoding") {
+    val wkbValues = Seq(
+      point0Wkb,
+      line0Wkb,
+      polygon0Wkb,
+      multiPointWkb,
+      multiLineStringWkb,
+      multiPolygonWkb,
+      geometryCollectionWkb
+    )
+
+    // Repeat the values to ensure that we have a large enough dataset to test
+    // the dictionary encoding.
+    val repeatedValues = List.fill(10000)(wkbValues).flatten
+
+    Seq(true, false).foreach { useDictionary =>
+      withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> 
useDictionary.toString) {
+        testReadWrite(repeatedValues)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to