This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bfe1c4165ea5 [SPARK-55261][GEO][SQL] Implement Parquet read support
for Geo types
bfe1c4165ea5 is described below
commit bfe1c4165ea570963154f87aaa8006405287c05f
Author: Uros Bojanic <[email protected]>
AuthorDate: Tue Feb 17 01:09:11 2026 +0800
[SPARK-55261][GEO][SQL] Implement Parquet read support for Geo types
### What changes were proposed in this pull request?
Enable reading Geometry and Geography data from Parquet files.
### Why are the changes needed?
Allowing users to read persisted geospatial data from Parquet format.
### Does this PR introduce _any_ user-facing change?
Yes, geo data can now be read from Parquet.
### How was this patch tested?
Added tests for reading GEOMETRY and GEOGRAPHY from Parquet.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54040 from uros-db/geo-parquet-read.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../parquet/ParquetVectorUpdaterFactory.java | 72 ++++++++++++
.../parquet/VectorizedColumnReader.java | 4 +
.../parquet/VectorizedDeltaByteArrayReader.java | 44 ++++++++
.../VectorizedDeltaLengthByteArrayReader.java | 35 ++++++
.../parquet/VectorizedPlainValuesReader.java | 53 +++++++++
.../datasources/parquet/VectorizedReaderBase.java | 10 ++
.../parquet/VectorizedRleValuesReader.java | 10 ++
.../parquet/VectorizedValuesReader.java | 2 +
.../datasources/parquet/WKBConverterStrategy.java | 51 +++++++++
.../execution/vectorized/WritableColumnVector.java | 1 +
.../datasources/parquet/ParquetRowConverter.scala | 82 +++++++++++++-
.../parquet/ParquetCompatibilityTest.scala | 81 ++++++++++++++
.../ParquetDeltaByteArrayEncodingSuite.scala | 58 +++++++++-
.../ParquetDeltaLengthByteArrayEncodingSuite.scala | 62 ++++++++++-
.../datasources/parquet/ParquetGeoSuite.scala | 122 +++++++++++++++++++++
15 files changed, 683 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 4f90f878da86..0e2c997e553f 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -209,6 +209,10 @@ public class ParquetVectorUpdaterFactory {
if (sparkType instanceof StringType || sparkType ==
DataTypes.BinaryType ||
canReadAsBinaryDecimal(descriptor, sparkType)) {
return new BinaryUpdater();
+ } else if (sparkType instanceof GeometryType) {
+ return new GeometryUpdater();
+ } else if (sparkType instanceof GeographyType) {
+ return new GeographyUpdater();
} else if (canReadAsDecimal(descriptor, sparkType)) {
return new BinaryToDecimalUpdater(descriptor, (DecimalType)
sparkType);
}
@@ -1044,6 +1048,74 @@ public class ParquetVectorUpdaterFactory {
}
}
+ private static final class GeometryUpdater implements ParquetVectorUpdater {
+
+ @Override
+ public void decodeSingleDictionaryId(
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(offset));
+ int srid = ((GeometryType) values.dataType()).srid();
+ values.putByteArray(offset,
+ WKBToGeometryConverter.INSTANCE.convert(v.getBytesUnsafe(), srid));
+ }
+
+ @Override
+ public void readValue(
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ this.readValues(1, offset, values, valuesReader);
+ }
+
+ @Override
+ public void readValues(int total, int offset, WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ valuesReader.readGeometry(total, values, offset);
+ }
+
+ @Override
+ public void skipValues(int total, VectorizedValuesReader valuesReader) {
+ valuesReader.skipBinary(total);
+ }
+ }
+
+ private static final class GeographyUpdater implements ParquetVectorUpdater {
+
+ @Override
+ public void decodeSingleDictionaryId(
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(offset));
+ int srid = ((GeographyType) values.dataType()).srid();
+ values.putByteArray(offset,
+ WKBToGeographyConverter.INSTANCE.convert(v.getBytesUnsafe(), srid));
+ }
+
+ @Override
+ public void readValue(
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ this.readValues(1, offset, values, valuesReader);
+ }
+
+ @Override
+ public void readValues(int total, int offset, WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ valuesReader.readGeography(total, values, offset);
+ }
+
+ @Override
+ public void skipValues(int total, VectorizedValuesReader valuesReader) {
+ valuesReader.skipBinary(total);
+ }
+ }
+
private static class IntegerToBinaryUpdater implements ParquetVectorUpdater {
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6e1660dc8c87..971edfec3b11 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -43,6 +43,8 @@ import
org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.GeometryType;
+import org.apache.spark.sql.types.GeographyType;
import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
@@ -180,6 +182,8 @@ public class VectorizedColumnReader {
break;
case BINARY:
isSupported = !needsDecimalScaleRebase(sparkType);
+ boolean isGeoType = sparkType instanceof GeometryType || sparkType
instanceof GeographyType;
+ isSupported = isSupported && !isGeoType;
break;
}
return isSupported;
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
index 198d57267fc3..1edee60bc564 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java
@@ -25,6 +25,8 @@ import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -100,6 +102,48 @@ public class VectorizedDeltaByteArrayReader extends
VectorizedReaderBase
readValues(total, c, rowId);
}
+ @Override
+ public void readGeometry(int total, WritableColumnVector c, int rowId) {
+ assert(c.dataType() instanceof GeometryType);
+ int srid = ((GeometryType) c.dataType()).srid();
+ readGeoData(total, c, rowId, srid, WKBToGeometryConverter.INSTANCE);
+ }
+
+ @Override
+ public void readGeography(int total, WritableColumnVector c, int rowId) {
+ assert(c.dataType() instanceof GeographyType);
+ int srid = ((GeographyType) c.dataType()).srid();
+ readGeoData(total, c, rowId, srid, WKBToGeographyConverter.INSTANCE);
+ }
+
+ private void readGeoData(int total, WritableColumnVector c, int rowId, int
srid,
+ WKBConverterStrategy converter) {
+ for (int i = 0; i < total; i++) {
+ int prefixLength = prefixLengthVector.getInt(currentRow);
+ ByteBuffer suffix = suffixReader.getBytes(currentRow);
+ int suffixLength = suffix.limit() - suffix.position();
+ int length = prefixLength + suffixLength;
+
+ byte[] wkb = new byte[length];
+ if (prefixLength > 0) {
+ previous.get(wkb, 0, prefixLength);
+ }
+ suffix.get(wkb, prefixLength, suffixLength);
+
+ // Converts WKB into a physical representation of geometry/geography.
+ byte[] physicalValue = converter.convert(wkb, srid);
+
+ WritableColumnVector arrayData = c.arrayData();
+ int offset = arrayData.getElementsAppended();
+ arrayData.appendBytes(physicalValue.length, physicalValue, 0);
+
+ c.putArray(rowId + i, offset, physicalValue.length);
+ previous = ByteBuffer.wrap(wkb);
+
+ currentRow++;
+ }
+ }
+
/**
* There was a bug (PARQUET-246) in which DeltaByteArrayWriter's reset()
method did not clear the
* previous value state that it tracks internally. This resulted in the
first value of all pages
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
index 9be867d61900..1cfaa59f18ea 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
@@ -25,6 +25,8 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
/**
* An implementation of the Parquet DELTA_LENGTH_BYTE_ARRAY decoder that
supports the vectorized
@@ -67,6 +69,39 @@ public class VectorizedDeltaLengthByteArrayReader extends
VectorizedReaderBase i
currentRow += total;
}
+ @Override
+ public void readGeometry(int total, WritableColumnVector c, int rowId) {
+ assert(c.dataType() instanceof GeometryType);
+ int srid = ((GeometryType) c.dataType()).srid();
+ readGeoData(total, c, rowId, srid, WKBToGeometryConverter.INSTANCE);
+ }
+
+ @Override
+ public void readGeography(int total, WritableColumnVector c, int rowId) {
+ assert(c.dataType() instanceof GeographyType);
+ int srid = ((GeographyType) c.dataType()).srid();
+ readGeoData(total, c, rowId, srid, WKBToGeographyConverter.INSTANCE);
+ }
+
+ private void readGeoData(int total, WritableColumnVector c, int rowId, int
srid,
+ WKBConverterStrategy converter) {
+ ByteBufferOutputWriter outputWriter =
ByteBufferOutputWriter::writeArrayByteBuffer;
+ int length;
+ for (int i = 0; i < total; i++) {
+ length = lengthsVector.getInt(currentRow + i);
+ byte[] physicalValue;
+ try {
+ // Converts WKB into a physical representation of geometry/geography.
+ physicalValue = converter.convert(in.readNBytes(length), srid);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + "
bytes");
+ }
+
+ outputWriter.write(c, rowId + i, ByteBuffer.wrap(physicalValue),
physicalValue.length);
+ }
+ currentRow += total;
+ }
+
public ByteBuffer getBytes(int rowId) {
int length = lengthsVector.getInt(rowId);
try {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 14ba534c143c..7364fa5536c0 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -30,6 +30,9 @@ import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.execution.datasources.DataSourceUtils;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.GeographyType;
+import org.apache.spark.sql.types.GeometryType;
+import org.apache.spark.util.ByteBufferOutputStream;
/**
* An implementation of the Parquet PLAIN decoder that supports the vectorized
interface.
@@ -406,4 +409,54 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
public void skipFixedLenByteArray(int total, int len) {
in.skip(total * (long) len);
}
+
+ @Override
+ public void readGeometry(int total, WritableColumnVector v, int rowId) {
+ assert(v.dataType() instanceof GeometryType);
+ int srid = ((GeometryType) v.dataType()).srid();
+ try {
+ readGeoData(total, v, rowId, srid, WKBToGeometryConverter.INSTANCE);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read geometry", e);
+ }
+ }
+
+ @Override
+ public void readGeography(int total, WritableColumnVector v, int rowId) {
+ assert(v.dataType() instanceof GeographyType);
+ int srid = ((GeographyType) v.dataType()).srid();
+ try {
+ readGeoData(total, v, rowId, srid, WKBToGeographyConverter.INSTANCE);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read geography", e);
+ }
+ }
+
+ private void readGeoData(int total, WritableColumnVector v, int rowId, int
srid,
+ WKBConverterStrategy converter) throws IOException {
+ // Go through the input stream and convert the WKB to the internal
representation
+ // writing it to the output buffer and putting the (offset, length) in the
vector.
+ // Finally, append all data from the output buffer in a single operation.
+ int base = v.arrayData().getElementsAppended();
+ int dataLen = 0;
+ final int intSize = 4;
+ ByteBuffer lenBuffer = ByteBuffer.allocate(intSize);
+ ByteBufferOutputStream out = new ByteBufferOutputStream();
+
+ for (int i = 0; i < total; i++) {
+ int len = readInteger();
+
+ // Converts WKB into a physical representation of geometry/geography.
+ byte[] physicalValue = converter.convert(in.readNBytes(len), srid);
+ v.putArray(rowId + i, base + dataLen + intSize, physicalValue.length);
+
+ lenBuffer.putInt(0, physicalValue.length);
+ out.write(lenBuffer.array());
+ out.write(physicalValue);
+
+ dataLen += intSize + physicalValue.length;
+ }
+ out.close();
+ v.arrayData().appendBytes(dataLen, out.toByteArray(), 0);
+ }
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
index ab8fd9bdb6ff..77bb9340e526 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
@@ -109,6 +109,16 @@ public class VectorizedReaderBase extends ValuesReader
implements VectorizedValu
throw SparkUnsupportedOperationException.apply();
}
+ @Override
+ public void readGeometry(int total, WritableColumnVector c, int rowId) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
+ @Override
+ public void readGeography(int total, WritableColumnVector c, int rowId) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
@Override
public void skipBooleans(int total) {
throw SparkUnsupportedOperationException.apply();
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 60544665409d..efb41e1753c1 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -764,6 +764,16 @@ public final class VectorizedRleValuesReader extends
ValuesReader
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
}
+ @Override
+ public void readGeometry(int total, WritableColumnVector c, int rowId) {
+ throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
+ }
+
+ @Override
+ public void readGeography(int total, WritableColumnVector c, int rowId) {
+ throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
+ }
+
@Override
public void readBooleans(int total, WritableColumnVector c, int rowId) {
int left = total;
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 430861433849..d29ce0dd12e4 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -57,6 +57,8 @@ public interface VectorizedValuesReader {
void readFloats(int total, WritableColumnVector c, int rowId);
void readDoubles(int total, WritableColumnVector c, int rowId);
void readBinary(int total, WritableColumnVector c, int rowId);
+ void readGeometry(int total, WritableColumnVector c, int rowId);
+ void readGeography(int total, WritableColumnVector c, int rowId);
/*
* Skips `total` values
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
new file mode 100644
index 000000000000..a8be90951289
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.catalyst.util.STUtils;
+
+/**
+ * Interface for converting a WKB byte array into a physical geometry or
geography value.
+ */
+interface WKBConverterStrategy {
+ byte[] convert(byte[] wkb, int srid);
+}
+
+/**
+ * Converts the provided WKB data into a geometry object.
+ */
+enum WKBToGeometryConverter implements WKBConverterStrategy {
+ INSTANCE;
+
+ @Override
+ public byte[] convert(byte[] wkb, int srid) {
+ return STUtils.stGeomFromWKB(wkb, srid).getBytes();
+ }
+}
+
+/**
+ * Converts the provided WKB data into a geography object.
+ */
+enum WKBToGeographyConverter implements WKBConverterStrategy {
+ INSTANCE;
+
+ @Override
+ public byte[] convert(byte[] wkb, int srid) {
+ return STUtils.stGeogFromWKB(wkb).getBytes();
+ }
+}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index c4f06e07911d..904c48309778 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -955,6 +955,7 @@ public abstract class WritableColumnVector extends
ColumnVector {
protected boolean isArray() {
return type instanceof ArrayType || type instanceof BinaryType || type
instanceof StringType ||
+ type instanceof GeometryType || type instanceof GeographyType ||
DecimalType.isByteArrayDecimalType(type);
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 271a1485dfd3..f253cbd0d0d3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -35,7 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{PhysicalByteType,
PhysicalShortType}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
CaseInsensitiveMap, DateTimeUtils, GenericArrayData, ResolveDefaultColumns,
STUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -43,7 +43,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
VariantMetadata}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal, UTF8String,
VariantVal}
import org.apache.spark.util.collection.Utils
/**
@@ -412,6 +412,12 @@ private[parquet] class ParquetRowConverter(
case _: StringType =>
new ParquetStringConverter(updater)
+ case geom: GeometryType =>
+ new ParquetGeometryConverter(geom.srid, updater)
+
+ case _: GeographyType =>
+ new ParquetGeographyConverter(updater)
+
// As long as the parquet type is INT64 timestamp, whether logical
annotation
// `isAdjustedToUTC` is false or true, it will be read as Spark's
TimestampLTZ type
case TimestampType
@@ -612,6 +618,78 @@ private[parquet] class ParquetRowConverter(
}
}
+ /**
+ * Parquet converter for strings. A dictionary is used to minimize string
decoding cost.
+ */
+ private final class ParquetGeometryConverter(srid: Int, updater:
ParentContainerUpdater)
+ extends ParquetPrimitiveConverter(updater) {
+
+ private var expandedDictionary: Array[GeometryVal] = null
+
+ override def hasDictionarySupport: Boolean = true
+
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+ STUtils.stGeomFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe,
srid)
+ }
+ }
+
+ override def addValueFromDictionary(dictionaryId: Int): Unit = {
+ updater.set(expandedDictionary(dictionaryId))
+ }
+
+ override def addBinary(value: Binary): Unit = {
+ val buffer = value.toByteBuffer
+ val numBytes = buffer.remaining()
+
+ val geometry = if (buffer.hasArray) {
+ val array = buffer.array()
+ val offset = buffer.arrayOffset() + buffer.position()
+ STUtils.stGeomFromWKB(array.slice(offset, offset + numBytes), srid)
+ } else {
+ STUtils.stGeomFromWKB(value.getBytesUnsafe, srid)
+ }
+
+ updater.set(geometry)
+ }
+ }
+
+ /**
+ * Parquet converter for strings. A dictionary is used to minimize string
decoding cost.
+ */
+ private final class ParquetGeographyConverter(updater:
ParentContainerUpdater)
+ extends ParquetPrimitiveConverter(updater) {
+
+ private var expandedDictionary: Array[GeographyVal] = null
+
+ override def hasDictionarySupport: Boolean = true
+
+ override def setDictionary(dictionary: Dictionary): Unit = {
+ this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
+ STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe)
+ }
+ }
+
+ override def addValueFromDictionary(dictionaryId: Int): Unit = {
+ updater.set(expandedDictionary(dictionaryId))
+ }
+
+ override def addBinary(value: Binary): Unit = {
+ val buffer = value.toByteBuffer
+ val numBytes = buffer.remaining()
+
+ val geometry = if (buffer.hasArray) {
+ val array = buffer.array()
+ val offset = buffer.arrayOffset() + buffer.position()
+ STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes))
+ } else {
+ STUtils.stGeogFromWKB(value.getBytesUnsafe)
+ }
+
+ updater.set(geometry)
+ }
+ }
+
/**
* Parquet converter for fixed-precision decimals.
*/
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index cd31cd963db7..414be5c608d1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -17,17 +17,22 @@
package org.apache.spark.sql.execution.datasources.parquet
+import java.nio.{ByteBuffer, ByteOrder}
+
import scala.jdk.CollectionConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.parquet.column.values.ValuesWriter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.Binary
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType}
/**
* Helper class for testing Parquet compatibility.
@@ -56,6 +61,82 @@ private[sql] abstract class ParquetCompatibilityTest extends
QueryTest with Parq
|${readParquetSchema(path)}
""".stripMargin)
}
+
+ protected def writeBinaryData(writer: ValuesWriter, binaryValues:
Array[Array[Byte]]): Unit = {
+ for (binaryValue <- binaryValues) {
+ writer.writeBytes(Binary.fromReusedByteArray(binaryValue))
+ }
+ }
+
+ /** Construct WKB for POINT(x, y) in little-endian format. */
+ protected def makePointWkb(x: Double, y: Double): Array[Byte] = {
+ val buf = ByteBuffer.allocate(21).order(ByteOrder.LITTLE_ENDIAN)
+ buf.put(1.toByte) // little-endian byte order
+ buf.putInt(1) // WKB type: Point
+ buf.putDouble(x)
+ buf.putDouble(y)
+ buf.array()
+ }
+
+ /** Construct WKB for LINESTRING in little-endian format from (x, y) pairs.
*/
+ protected def makeLineStringWkb(coords: (Double, Double)*): Array[Byte] = {
+ val numPoints = coords.length
+ val buf = ByteBuffer.allocate(9 + 16 *
numPoints).order(ByteOrder.LITTLE_ENDIAN)
+ buf.put(1.toByte) // little-endian byte order
+ buf.putInt(2) // WKB type: LineString
+ buf.putInt(numPoints)
+ coords.foreach { case (x, y) =>
+ buf.putDouble(x)
+ buf.putDouble(y)
+ }
+ buf.array()
+ }
+
+ /** Construct WKB for POLYGON in little-endian format from a single ring of
(x, y) pairs.
+ * An empty argument list produces an empty polygon (0 rings). */
+ protected def makePolygonWkb(ring: (Double, Double)*): Array[Byte] = {
+ if (ring.isEmpty) {
+ val buf = ByteBuffer.allocate(9).order(ByteOrder.LITTLE_ENDIAN)
+ buf.put(1.toByte) // little-endian byte order
+ buf.putInt(3) // WKB type: Polygon
+ buf.putInt(0) // 0 rings = empty
+ buf.array()
+ } else {
+ val numPoints = ring.length
+ val buf = ByteBuffer.allocate(13 + 16 *
numPoints).order(ByteOrder.LITTLE_ENDIAN)
+ buf.put(1.toByte) // little-endian byte order
+ buf.putInt(3) // WKB type: Polygon
+ buf.putInt(1) // number of rings
+ buf.putInt(numPoints)
+ ring.foreach { case (x, y) =>
+ buf.putDouble(x)
+ buf.putDouble(y)
+ }
+ buf.array()
+ }
+ }
+
+ /**
+ * Construct WKB for a multi/collection geometry in little-endian format.
+ * WKB types: MultiPoint=4, MultiLineString=5, MultiPolygon=6,
GeometryCollection=7.
+ */
+ protected def makeMultiWkb(wkbType: Int, geometries: Array[Byte]*):
Array[Byte] = {
+ val totalSize = 9 + geometries.map(_.length).sum
+ val buf = ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN)
+ buf.put(1.toByte) // little-endian byte order
+ buf.putInt(wkbType)
+ buf.putInt(geometries.length)
+ geometries.foreach(g => buf.put(g))
+ buf.array()
+ }
+
+ protected def testGeo(testName: String)(testFun: DataType => Unit): Unit = {
+ Seq(GeometryType(0), GeometryType(4326), GeographyType(4326)).foreach {
geoType =>
+ test(s"$testName $geoType") {
+ testFun(geoType)
+ }
+ }
+ }
}
private[sql] object ParquetCompatibilityTest {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
index c54eef348f34..9b34e5011b5d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaByteArrayEncodingSuite.scala
@@ -20,9 +20,10 @@ import org.apache.parquet.bytes.DirectByteBufferAllocator
import org.apache.parquet.column.values.Utils
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter
+import org.apache.spark.sql.catalyst.util.STUtils
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector,
WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType,
IntegerType, StringType}
/**
* Read tests for vectorized Delta byte array reader.
@@ -81,6 +82,61 @@ class ParquetDeltaByteArrayEncodingSuite extends
ParquetCompatibilityTest with S
assert(7 == writableColumnVector.getInt(2))
}
+ testGeo("geo types single point") { geoType =>
+ assertGeoReadWrite(writer, reader, Array(makePointWkb(1, 1)), geoType)
+ }
+
+ testGeo("geo types multiple identical points") { geoType =>
+ assertGeoReadWrite(writer, reader,
+ Array(makePointWkb(1, 1), makePointWkb(1, 1), makePointWkb(1, 1)),
geoType)
+ }
+
+ testGeo("geo types polygons with shared prefix") { geoType =>
+ // These polygons share a WKB prefix, exercising delta encoding.
+ assertGeoReadWrite(writer, reader, Array(
+ makePolygonWkb((3, 3), (4, 4), (5, 5.1), (3, 3)),
+ makePolygonWkb((3, 3), (4, 4), (5, 5.2), (3, 3)),
+ makePolygonWkb((3, 3), (4, 4), (5, 5.3), (3, 3))),
+ geoType)
+ }
+
+ private def assertGeoReadWrite(
+ writer: DeltaByteArrayWriter,
+ reader: VectorizedDeltaByteArrayReader,
+ wkbValues: Array[Array[Byte]],
+ dataType: DataType): Unit = {
+
+ val (isGeometry, srid) = dataType match {
+ case geom: GeometryType => (true, geom.srid)
+ case geog: GeographyType => (false, geog.srid)
+ }
+
+ val length = wkbValues.length
+
+ writeBinaryData(writer, wkbValues)
+ writableColumnVector = new OnHeapColumnVector(length, dataType)
+
+ reader.initFromPage(length, writer.getBytes.toInputStream)
+ if (isGeometry) {
+ reader.readGeometry(length, writableColumnVector, 0)
+ } else {
+ reader.readGeography(length, writableColumnVector, 0)
+ }
+
+ for (i <- 0 until length) {
+ val actualWkb = if (isGeometry) {
+ val geom = writableColumnVector.getGeometry(i)
+ assert(srid === STUtils.stSrid(geom))
+ STUtils.stAsBinary(geom)
+ } else {
+ val geog = writableColumnVector.getGeography(i)
+ assert(srid === STUtils.stSrid(geog))
+ STUtils.stAsBinary(geog)
+ }
+ assert(wkbValues(i) sameElements actualWkb)
+ }
+ }
+
private def assertReadWrite(
writer: DeltaByteArrayWriter,
reader: VectorizedDeltaByteArrayReader,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
index a1c01632dd3c..b9009f59f69a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDeltaLengthByteArrayEncodingSuite.scala
@@ -24,9 +24,10 @@ import org.apache.parquet.column.values.Utils
import
org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter
import org.apache.parquet.io.api.Binary
+import org.apache.spark.sql.catalyst.util.STUtils
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector,
WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.sql.types.{DataType, GeographyType, GeometryType,
IntegerType, StringType}
/**
* Read tests for vectorized Delta length byte array reader.
@@ -104,6 +105,65 @@ class ParquetDeltaLengthByteArrayEncodingSuite
}
}
+ testGeo("geo types single point") { geoType =>
+ assertGeoReadWrite(writer, reader, Array(makePointWkb(1, 1)), geoType)
+ }
+
+ testGeo("geo types with multiple geometries") { geoType =>
+ // Different WKB sizes exercise variable-length encoding.
+ assertGeoReadWrite(writer, reader, Array(
+ makePointWkb(0, 0),
+ makeLineStringWkb((1, 1), (2, 1)),
+ makePointWkb(3, 4)),
+ geoType)
+ }
+
+ testGeo("geo types with mixed length polygons") { geoType =>
+ // Polygons with increasing vertex count.
+ assertGeoReadWrite(writer, reader, Array(
+ makePolygonWkb((1, 2), (3, 4), (5, 6), (1, 2)),
+ makePolygonWkb((1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (1, 2)),
+ makePolygonWkb((1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12), (13,
14), (1, 2))),
+ geoType)
+ }
+
+ private def assertGeoReadWrite(
+ writer: DeltaLengthByteArrayValuesWriter,
+ reader: VectorizedDeltaLengthByteArrayReader,
+ wkbValues: Array[Array[Byte]],
+ dataType: DataType): Unit = {
+
+ val (isGeometry, srid) = dataType match {
+ case geom: GeometryType => (true, geom.srid)
+ case geog: GeographyType => (false, geog.srid)
+ }
+
+ val length = wkbValues.length
+
+ writeBinaryData(writer, wkbValues)
+ writableColumnVector = new OnHeapColumnVector(length, dataType)
+
+ reader.initFromPage(length, writer.getBytes.toInputStream)
+ if (isGeometry) {
+ reader.readGeometry(length, writableColumnVector, 0)
+ } else {
+ reader.readGeography(length, writableColumnVector, 0)
+ }
+
+ for (i <- 0 until length) {
+ val actualWkb = if (isGeometry) {
+ val geom = writableColumnVector.getGeometry(i)
+ assert(srid === STUtils.stSrid(geom))
+ STUtils.stAsBinary(geom)
+ } else {
+ val geog = writableColumnVector.getGeography(i)
+ assert(srid === STUtils.stSrid(geog))
+ STUtils.stAsBinary(geog)
+ }
+ assert(wkbValues(i) sameElements actualWkb)
+ }
+ }
+
private def writeData(writer: DeltaLengthByteArrayValuesWriter, values:
Array[String]): Unit = {
for (i <- values.indices) {
writer.writeBytes(Binary.fromString(values(i)))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
new file mode 100644
index 000000000000..107b5b7675b1
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.parquet.hadoop.ParquetOutputFormat
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb,
st_geomfromwkb}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ParquetGeoSuite
+ extends ParquetCompatibilityTest
+ with SharedSparkSession {
+
+ import testImplicits._
+
+ /**
+ * Writes the given WKB byte arrays to Parquet files in geometry and
geography columns.
+ */
+ private def writeParquetFiles(dir: File, wkbValues: Seq[Array[Byte]]): Unit
= {
+ val df = wkbValues
+ .toDF("wkb")
+ .select(st_geomfromwkb($"wkb").as("geom"),
st_geogfromwkb($"wkb").as("geog"))
+ df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+ }
+
+ /**
+ * Reads all parquet files from the given directory and returns a DataFrame
with WKB
+ * representation of `geom` and `geog` columns.
+ */
+ private def readParquetFiles(dir: File): DataFrame = {
+ spark.read
+ .parquet(dir.getAbsolutePath)
+ .select(st_asbinary($"geom"), st_asbinary($"geog"))
+ }
+
+ /**
+ * Writes the given WKB byte arrays to Parquet files in both geometry and
geography columns.
+ * Reads them back checking that the read values match the original WKB
values.
+ */
+ private def testReadWrite(wkbValues: Seq[Array[Byte]]): Unit = {
+ withTempDir { dir =>
+ withAllParquetWriters {
+ writeParquetFiles(dir, wkbValues)
+
+ withAllParquetReaders {
+ checkAnswer(readParquetFiles(dir), wkbValues.map(wkb => Row(wkb,
wkb)))
+ }
+ }
+ }
+ }
+
+ /** Common WKB test values used across multiple tests. */
+ private val point0Wkb = makePointWkb(0, 0) // POINT(0 0)
+ private val point1Wkb = makePointWkb(1, 1) // POINT(1 1)
+ private val line0Wkb = makeLineStringWkb((0, 0), (1, 1)) // LINESTRING(0 0,1
1)
+ private val line1Wkb = makeLineStringWkb((1, 1), (2, 2)) // LINESTRING(1 1,2
2)
+ private val line2Wkb = makeLineStringWkb((2, 2), (3, 3)) // LINESTRING(2 2,3
3)
+ // POLYGON((0 0,1 1,2 2,0 0))
+ private val polygon0Wkb = makePolygonWkb((0, 0), (1, 1), (2, 2), (0, 0))
+ // POLYGON((3 3,4 4,5 5,3 3))
+ private val polygon3Wkb = makePolygonWkb((3, 3), (4, 4), (5, 5), (3, 3))
+ // MULTIPOINT((0 0),(1 1))
+ private val multiPointWkb = makeMultiWkb(4, point0Wkb, point1Wkb)
+ // MULTILINESTRING((0 0,1 1),(2 2,3 3))
+ private val multiLineStringWkb = makeMultiWkb(5, line0Wkb, line2Wkb)
+ // MULTIPOLYGON(((0 0,1 1,2 2,0 0)),((3 3,4 4,5 5,3 3)))
+ private val multiPolygonWkb = makeMultiWkb(6, polygon0Wkb, polygon3Wkb)
+ // GEOMETRYCOLLECTION(POINT(0 0),LINESTRING(1 1,2 2))
+ private val geometryCollectionWkb = makeMultiWkb(7, point0Wkb, line1Wkb)
+
+ test("basic read and write") {
+ testReadWrite(Seq(point0Wkb))
+ testReadWrite(Seq(line0Wkb))
+ testReadWrite(Seq(polygon0Wkb))
+ testReadWrite(Seq(multiPointWkb))
+ testReadWrite(Seq(multiLineStringWkb))
+ testReadWrite(Seq(multiPolygonWkb))
+ testReadWrite(Seq(geometryCollectionWkb))
+ // Test writing multiple values at once.
+ testReadWrite(Seq(point1Wkb, line1Wkb, makePolygonWkb()))
+ }
+
+ test("dictionary encoding") {
+ val wkbValues = Seq(
+ point0Wkb,
+ line0Wkb,
+ polygon0Wkb,
+ multiPointWkb,
+ multiLineStringWkb,
+ multiPolygonWkb,
+ geometryCollectionWkb
+ )
+
+ // Repeat the values to ensure that we have a large enough dataset to test
+ // the dictionary encoding.
+ val repeatedValues = List.fill(10000)(wkbValues).flatten
+
+ Seq(true, false).foreach { useDictionary =>
+ withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY ->
useDictionary.toString) {
+ testReadWrite(repeatedValues)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]