Copilot commented on code in PR #2077:
URL: https://github.com/apache/sedona/pull/2077#discussion_r2205769792


##########
spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala:
##########
@@ -29,6 +29,10 @@ import 
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_DBSCAN, ST_L
 class GeoStatsSuite extends TestBaseScala {
   private val spark = sparkSession
 

Review Comment:
   [nitpick] This override disables adaptive query execution for all tests in 
GeoStatsSuite, which may introduce inconsistency with other test suites; 
consider adding a comment explaining why it's needed or scoping it to only the 
tests that require it.
   ```suggestion
   
     // Disabling adaptive query execution globally for this suite to ensure 
deterministic query plans.
     // If specific tests require this configuration, consider scoping it to 
those tests instead.
   ```



##########
spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java:
##########
@@ -57,5 +57,7 @@ public void testBytesFromString() {
     assertEquals(-1, SedonaConf.bytesFromString("-1"));
     assertEquals(1024, SedonaConf.bytesFromString("1k"));
     assertEquals(2097152, SedonaConf.bytesFromString("2MB"));
+    // fromSparkEnv means we dont have access to default values so sometimes 
we get null as input

Review Comment:
   Typographical error: 'dont' should be written as 'don't' for clarity.
   ```suggestion
       // fromSparkEnv means we don't have access to default values so 
sometimes we get null as input
   ```



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala:
##########
@@ -22,19 +22,23 @@ import org.apache.sedona.common.geometryObjects.Geography
 import org.apache.sedona.common.{Functions, FunctionsGeoTools}
 import org.apache.sedona.common.sphere.{Haversine, Spheroid}
 import org.apache.sedona.common.utils.{InscribedCircle, ValidDetail}
+import org.apache.sedona.core.utils.SedonaConf
 import org.apache.sedona.sql.utils.GeometrySerializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, Generator, Nondeterministic}
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, Generator, ImplicitCastInputTypes, Nondeterministic, 
UnaryExpression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.implicits._
 import org.apache.spark.sql.types._
 import org.locationtech.jts.algorithm.MinimumBoundingCircle
 import org.locationtech.jts.geom._
 import 
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
+import 
org.apache.spark.sql.sedona_sql.expressions.LibPostalUtils.{getExpanderFromConf,
 getParserFromConf}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
+import com.mapzen.jpostal.{AddressExpander, AddressParser}
+import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 

Review Comment:
   Imported BlockHelper is not used in this file; consider removing this unused 
import to keep the code clean.
   ```suggestion
   
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.utils.SedonaConf
+import 
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ExpandAddress, 
ParseAddress}
+import org.apache.spark.sql.{Row, functions => f}
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.LoggerFactory
+
+import java.nio.file.{Files, Paths}
+import scala.collection.mutable
+
+class AddressProcessingFunctionsTest extends TestBaseScala with 
BeforeAndAfterEach {
+  private val logger = LoggerFactory.getLogger(getClass)
+  var clearedLibPostal = false
+
+  def clearLibpostalDataDir(): String = {
+    val dir = SedonaConf.fromActiveSession().getLibPostalDataDir
+    if (dir != null && dir.nonEmpty) {
+      try {
+        Files
+          .walk(Paths.get(dir))
+          .sorted(java.util.Comparator.reverseOrder())
+          .forEach(path => Files.deleteIfExists(path))
+      } catch {
+        case e: Exception =>
+          logger.warn(s"Failed to clear libpostal data directory: 
${e.getMessage}")
+      }
+    }
+    dir
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    if (!clearedLibPostal) {
+      clearedLibPostal = true
+      clearLibpostalDataDir()
+    }
+  }
+
+  describe("ExpandAddress") {
+    it("should return expected normalized forms") {
+      val resultDf = sparkSession
+        .sql(
+          "SELECT ExpandAddress('781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA') as normalized")
+      resultDf.write
+        .format("noop")
+        .mode("overwrite")
+        .save() // Tests a deserialization step that collect does not
+      val result = resultDf.collect
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[String]]
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+
+    it("should return null for null input") {
+      val result = sparkSession.sql("SELECT ExpandAddress(NULL) as 
normalized").collect()
+      assert(result.head.get(0) == null)
+    }
+
+    it("should should work when chained with explode") {
+      val result = sparkSession
+        .sql("SELECT Explode(ExpandAddress('781 Franklin Ave Crown Heights 
Brooklyn NY 11216 USA')) as normalized")
+        .collect()
+        .map(_.getString(0))
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+  }
+
+  describe("ParseAddress") {
+    it("should return expected label/value pairs") {
+      val resultDf = sparkSession
+        .sql(
+          "SELECT ParseAddress('781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA') as parsed")
+      resultDf.write
+        .format("noop")
+        .mode("overwrite")
+        .save() // Tests a deserialization step that collect does not
+      val result = resultDf
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[Row]]
+        .map(row => row.getAs[String]("label") -> row.getAs[String]("value"))
+        .toMap
+
+      assert(result.contains("road"))
+      assert(result("road") == "franklin ave")
+      assert(result.contains("city_district"))
+      assert(result("city_district") == "brooklyn")
+      assert(result.contains("house_number"))
+      assert(result("house_number") == "781")
+      assert(result.contains("postcode"))
+      assert(result("postcode") == "11216")
+      assert(result.contains("country"))
+      assert(result("country").toLowerCase.contains("usa"))
+      assert(result.contains("suburb"))
+      assert(result("suburb").toLowerCase.contains("crown heights"))
+    }
+
+    it("should return null for null input") {
+      val result = sparkSession.sql("SELECT ParseAddress(NULL) as 
parsed").collect()
+      assert(result.head.get(0) == null)
+    }
+
+    it("should should work when chained with explode") {

Review Comment:
   Duplicate 'should' in the test description; it should read 'should work when 
chained with explode'.
   ```suggestion
       it("should work when chained with explode") {
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.utils.SedonaConf
+import 
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ExpandAddress, 
ParseAddress}
+import org.apache.spark.sql.{Row, functions => f}
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.LoggerFactory
+
+import java.nio.file.{Files, Paths}
+import scala.collection.mutable
+
+class AddressProcessingFunctionsTest extends TestBaseScala with 
BeforeAndAfterEach {
+  private val logger = LoggerFactory.getLogger(getClass)
+  var clearedLibPostal = false
+
+  def clearLibpostalDataDir(): String = {
+    val dir = SedonaConf.fromActiveSession().getLibPostalDataDir
+    if (dir != null && dir.nonEmpty) {
+      try {
+        Files
+          .walk(Paths.get(dir))
+          .sorted(java.util.Comparator.reverseOrder())
+          .forEach(path => Files.deleteIfExists(path))
+      } catch {
+        case e: Exception =>
+          logger.warn(s"Failed to clear libpostal data directory: 
${e.getMessage}")
+      }
+    }
+    dir
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    if (!clearedLibPostal) {
+      clearedLibPostal = true
+      clearLibpostalDataDir()
+    }
+  }
+
+  describe("ExpandAddress") {
+    it("should return expected normalized forms") {
+      val resultDf = sparkSession
+        .sql(
+          "SELECT ExpandAddress('781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA') as normalized")
+      resultDf.write
+        .format("noop")
+        .mode("overwrite")
+        .save() // Tests a deserialization step that collect does not
+      val result = resultDf.collect
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[String]]
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+
+    it("should return null for null input") {
+      val result = sparkSession.sql("SELECT ExpandAddress(NULL) as 
normalized").collect()
+      assert(result.head.get(0) == null)
+    }
+
+    it("should should work when chained with explode") {

Review Comment:
   Duplicate 'should' in the test description; it should read 'should work when 
chained with explode'.
   ```suggestion
       it("should work when chained with explode") {
   ```



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala:
##########
@@ -1831,3 +1835,140 @@ case class ST_InterpolatePoint(inputExpressions: 
Seq[Expression])
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) =
     copy(inputExpressions = newChildren)
 }
+
+case class ExpandAddress(address: Expression)
+    extends UnaryExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with FoldableExpression
+    with Serializable {
+
+  def this(children: Seq[Expression]) = this(children.head)
+
+  lazy private final val expander = {
+    val conf = SedonaConf.fromSparkEnv
+    getExpanderFromConf(conf.getLibPostalDataDir, conf.getLibPostalUseSenzing)
+  }
+
+  override def nullable: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+
+  override def dataType: DataType = ArrayType(StringType)
+
+  override def eval(input: InternalRow): Any = {
+    val addressVal = address.eval(input)
+    if (addressVal == null) {
+      null
+    } else {
+      new GenericArrayData(
+        expander
+          .expandAddress(addressVal.asInstanceOf[UTF8String].toString)
+          .map(UTF8String.fromString))
+    }
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val expanderRef = ctx.addReferenceObj("expander", expander, 
classOf[AddressExpander].getName)
+    val utf8StringClass = "org.apache.spark.unsafe.types.UTF8String"
+    val arrayDataClass = "org.apache.spark.sql.catalyst.util.GenericArrayData"
+    val addressRef = child.genCode(ctx)
+    val address = addressRef.value
+    ev.copy(code = code"""
+          ${addressRef.code}
+          boolean ${ev.isNull} = ${addressRef.isNull};
+          $arrayDataClass ${ev.value} = null;
+
+          if (!${ev.isNull}) {
+            String[] expandedAddressArray = 
$expanderRef.expandAddress($address.toString());
+            $utf8StringClass[] utf8Strings = new 
$utf8StringClass[expandedAddressArray.length];
+            for (int j = 0; j < expandedAddressArray.length; j++) {
+              utf8Strings[j] = 
$utf8StringClass.fromString(expandedAddressArray[j]);
+            }
+            ${ev.value} = new $arrayDataClass(utf8Strings);
+          } else {
+            ${ev.value} = new $arrayDataClass(new $utf8StringClass[0]);

Review Comment:
   In doGenCode the else branch returns an empty array for null inputs, but 
eval returns null; this inconsistent null-handling can lead to incorrect 
behavior under codegen. Update the generated code to set ev.isNull and return 
null when the input is null.



##########
python/sedona/spark/sql/st_functions.py:
##########
@@ -32,6 +32,25 @@
 _call_st_function = partial(call_sedona_function, "st_functions")
 
 

Review Comment:
   [nitpick] Missing @validate_argument_types decorator on ExpandAddress for 
input validation consistency; consider adding it as is done for ParseAddress.
   ```suggestion
   
   @validate_argument_types
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to