huan233usc commented on code in PR #16650:
URL: https://github.com/apache/iceberg/pull/16650#discussion_r3376229574


##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkGeoTypes.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HexFormat;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.Geography;
+import org.apache.spark.sql.types.Geometry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * End-to-end Spark SQL tests for the geometry and geography types: schema, 
Parquet round-trip, and
+ * deletion vectors.
+ *
+ * <p>Exercises the geometry/geography path through CREATE / INSERT (built 
from WKB literals via
+ * Spark's stock {@code ST_GeomFromWKB} / {@code ST_GeogFromWKB}), SELECT 
(verifies WKB and SRID
+ * round-trip through the Iceberg Parquet readers/writers) and DELETE on a v3 
+ merge-on-read table
+ * (verifies that deletion vectors are produced for tables containing geo 
columns).
+ *
+ * <p>Topological predicates such as {@code ST_Intersects} are not part of 
stock Spark 4.1; the
+ * predicate coverage here is intentionally limited to {@code ST_Srid(...) = 
...}, which exercises
+ * that the Iceberg reader re-attached the per-row SRID header from the 
column's CRS.
+ */
+public class TestSparkGeoTypes extends TestBase {
+
+  private static final String CATALOG = "local";
+  private static final String GEOMETRY_TABLE = "default.geo_geom";
+  private static final String GEOGRAPHY_TABLE = "default.geo_geog";
+
+  private static final int DEFAULT_SRID = 4326;
+
+  // WKB byte-order octet for little-endian (NDR).
+  private static final byte WKB_LE = 0x01;
+  // WKB type code for a 2D Point.
+  private static final int WKB_POINT = 1;
+
+  @BeforeAll
+  public static void setupCatalog() {
+    spark.conf().set("spark.sql.catalog." + CATALOG, 
SparkCatalog.class.getName());
+    spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop");
+    spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", 
"default");
+    spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", 
"false");
+    String warehouse = System.getProperty("java.io.tmpdir") + 
"/iceberg_spark_geo_warehouse";
+    spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", warehouse);
+
+    // Spark 4.1 gates the GEOMETRY/GEOGRAPHY parser and built-in ST_ 
functions behind this flag.
+    spark.conf().set("spark.sql.geospatial.enabled", "true");
+  }
+
+  @BeforeEach
+  public void cleanupTables() {
+    sql("DROP TABLE IF EXISTS %s", qualified(GEOMETRY_TABLE));
+    sql("DROP TABLE IF EXISTS %s", qualified(GEOGRAPHY_TABLE));
+  }
+
+  @AfterEach
+  public void dropTables() {
+    sql("DROP TABLE IF EXISTS %s", qualified(GEOMETRY_TABLE));
+    sql("DROP TABLE IF EXISTS %s", qualified(GEOGRAPHY_TABLE));
+  }
+
+  @Test
+  public void testGeometryRoundTrip() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, geom GEOMETRY(%d)) USING iceberg "
+            + "TBLPROPERTIES ("
+            + "'format-version'='3', "
+            + "'read.parquet.vectorization.enabled'='false')",
+        qualified(GEOMETRY_TABLE), DEFAULT_SRID);
+
+    insertGeometry(GEOMETRY_TABLE, 1L, point2D(1.0, 2.0));
+    insertGeometry(GEOMETRY_TABLE, 2L, point2D(3.0, 4.0));
+    insertGeometry(GEOMETRY_TABLE, 3L, point2D(10.0, 20.0));
+
+    List<Row> rows =
+        spark.table(qualified(GEOMETRY_TABLE)).select("id", 
"geom").orderBy("id").collectAsList();
+    assertThat(rows).hasSize(3);
+
+    assertGeometryRow(rows.get(0), 1L, point2D(1.0, 2.0), DEFAULT_SRID);
+    assertGeometryRow(rows.get(1), 2L, point2D(3.0, 4.0), DEFAULT_SRID);
+    assertGeometryRow(rows.get(2), 3L, point2D(10.0, 20.0), DEFAULT_SRID);
+  }
+
+  @Test
+  public void testGeographyRoundTrip() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, geog GEOGRAPHY(%d)) USING iceberg "
+            + "TBLPROPERTIES ("
+            + "'format-version'='3', "
+            + "'read.parquet.vectorization.enabled'='false')",
+        qualified(GEOGRAPHY_TABLE), DEFAULT_SRID);
+
+    insertGeography(GEOGRAPHY_TABLE, 1L, point2D(-122.4194, 37.7749)); // San 
Francisco
+    insertGeography(GEOGRAPHY_TABLE, 2L, point2D(-73.9857, 40.7484)); // New 
York
+
+    List<Row> rows =
+        spark.table(qualified(GEOGRAPHY_TABLE)).select("id", 
"geog").orderBy("id").collectAsList();
+    assertThat(rows).hasSize(2);
+
+    assertGeographyRow(rows.get(0), 1L, point2D(-122.4194, 37.7749), 
DEFAULT_SRID);
+    assertGeographyRow(rows.get(1), 2L, point2D(-73.9857, 40.7484), 
DEFAULT_SRID);
+  }
+
+  @Test
+  public void testSridFilterRoundtrip() {
+    sql(
+        "CREATE TABLE %s (id BIGINT, geom GEOMETRY(%d)) USING iceberg "
+            + "TBLPROPERTIES ("
+            + "'format-version'='3', "
+            + "'read.parquet.vectorization.enabled'='false')",
+        qualified(GEOMETRY_TABLE), DEFAULT_SRID);
+
+    insertGeometry(GEOMETRY_TABLE, 1L, point2D(1.0, 2.0));
+    insertGeometry(GEOMETRY_TABLE, 2L, point2D(3.0, 4.0));
+

Review Comment:
   TODO: look up metadata



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to