Copilot commented on code in PR #2077:
URL: https://github.com/apache/sedona/pull/2077#discussion_r2205769792
##########
spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala:
##########
@@ -29,6 +29,10 @@ import
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_DBSCAN, ST_L
class GeoStatsSuite extends TestBaseScala {
private val spark = sparkSession
Review Comment:
[nitpick] This override disables adaptive query execution for all tests in
GeoStatsSuite, which may introduce inconsistency with other test suites;
consider adding a comment explaining why it's needed or scoping it to only the
tests that require it.
```suggestion
// Disabling adaptive query execution globally for this suite to ensure
deterministic query plans.
// If specific tests require this configuration, consider scoping it to
those tests instead.
```
##########
spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java:
##########
@@ -57,5 +57,7 @@ public void testBytesFromString() {
assertEquals(-1, SedonaConf.bytesFromString("-1"));
assertEquals(1024, SedonaConf.bytesFromString("1k"));
assertEquals(2097152, SedonaConf.bytesFromString("2MB"));
+ // fromSparkEnv means we dont have access to default values so sometimes
we get null as input
Review Comment:
Typographical error: 'dont' should be written as 'don't' for clarity.
```suggestion
// fromSparkEnv means we don't have access to default values so
sometimes we get null as input
```
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala:
##########
@@ -22,19 +22,23 @@ import org.apache.sedona.common.geometryObjects.Geography
import org.apache.sedona.common.{Functions, FunctionsGeoTools}
import org.apache.sedona.common.sphere.{Haversine, Spheroid}
import org.apache.sedona.common.utils.{InscribedCircle, ValidDetail}
+import org.apache.sedona.core.utils.SedonaConf
import org.apache.sedona.sql.utils.GeometrySerializer
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes,
Expression, Generator, Nondeterministic}
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
CodegenFallback, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes,
Expression, Generator, ImplicitCastInputTypes, Nondeterministic,
UnaryExpression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.sedona_sql.expressions.implicits._
import org.apache.spark.sql.types._
import org.locationtech.jts.algorithm.MinimumBoundingCircle
import org.locationtech.jts.geom._
import
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
+import
org.apache.spark.sql.sedona_sql.expressions.LibPostalUtils.{getExpanderFromConf,
getParserFromConf}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
+import com.mapzen.jpostal.{AddressExpander, AddressParser}
+import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
Review Comment:
Imported BlockHelper is not used in this file; consider removing this unused
import to keep the code clean.
```suggestion
```
##########
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.utils.SedonaConf
+import
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ExpandAddress,
ParseAddress}
+import org.apache.spark.sql.{Row, functions => f}
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.LoggerFactory
+
+import java.nio.file.{Files, Paths}
+import scala.collection.mutable
+
+class AddressProcessingFunctionsTest extends TestBaseScala with
BeforeAndAfterEach {
+ private val logger = LoggerFactory.getLogger(getClass)
+ var clearedLibPostal = false
+
+ def clearLibpostalDataDir(): String = {
+ val dir = SedonaConf.fromActiveSession().getLibPostalDataDir
+ if (dir != null && dir.nonEmpty) {
+ try {
+ Files
+ .walk(Paths.get(dir))
+ .sorted(java.util.Comparator.reverseOrder())
+ .forEach(path => Files.deleteIfExists(path))
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Failed to clear libpostal data directory:
${e.getMessage}")
+ }
+ }
+ dir
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ if (!clearedLibPostal) {
+ clearedLibPostal = true
+ clearLibpostalDataDir()
+ }
+ }
+
+ describe("ExpandAddress") {
+ it("should return expected normalized forms") {
+ val resultDf = sparkSession
+ .sql(
+ "SELECT ExpandAddress('781 Franklin Ave Crown Heights Brooklyn NY
11216 USA') as normalized")
+ resultDf.write
+ .format("noop")
+ .mode("overwrite")
+ .save() // Tests a deserialization step that collect does not
+ val result = resultDf.collect
+ .take(1)(0)(0)
+ .asInstanceOf[mutable.Seq[String]]
+ assert(result.contains("781 franklin avenue crown heights brooklyn new
york 11216 usa"))
+ }
+
+ it("should return null for null input") {
+ val result = sparkSession.sql("SELECT ExpandAddress(NULL) as
normalized").collect()
+ assert(result.head.get(0) == null)
+ }
+
+ it("should should work when chained with explode") {
+ val result = sparkSession
+ .sql("SELECT Explode(ExpandAddress('781 Franklin Ave Crown Heights
Brooklyn NY 11216 USA')) as normalized")
+ .collect()
+ .map(_.getString(0))
+ assert(result.contains("781 franklin avenue crown heights brooklyn new
york 11216 usa"))
+ }
+ }
+
+ describe("ParseAddress") {
+ it("should return expected label/value pairs") {
+ val resultDf = sparkSession
+ .sql(
+ "SELECT ParseAddress('781 Franklin Ave Crown Heights Brooklyn NY
11216 USA') as parsed")
+ resultDf.write
+ .format("noop")
+ .mode("overwrite")
+ .save() // Tests a deserialization step that collect does not
+ val result = resultDf
+ .take(1)(0)(0)
+ .asInstanceOf[mutable.Seq[Row]]
+ .map(row => row.getAs[String]("label") -> row.getAs[String]("value"))
+ .toMap
+
+ assert(result.contains("road"))
+ assert(result("road") == "franklin ave")
+ assert(result.contains("city_district"))
+ assert(result("city_district") == "brooklyn")
+ assert(result.contains("house_number"))
+ assert(result("house_number") == "781")
+ assert(result.contains("postcode"))
+ assert(result("postcode") == "11216")
+ assert(result.contains("country"))
+ assert(result("country").toLowerCase.contains("usa"))
+ assert(result.contains("suburb"))
+ assert(result("suburb").toLowerCase.contains("crown heights"))
+ }
+
+ it("should return null for null input") {
+ val result = sparkSession.sql("SELECT ParseAddress(NULL) as
parsed").collect()
+ assert(result.head.get(0) == null)
+ }
+
+ it("should should work when chained with explode") {
Review Comment:
Duplicate 'should' in the test description; it should read 'should work when
chained with explode'.
```suggestion
it("should work when chained with explode") {
```
##########
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.utils.SedonaConf
+import
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ExpandAddress,
ParseAddress}
+import org.apache.spark.sql.{Row, functions => f}
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.LoggerFactory
+
+import java.nio.file.{Files, Paths}
+import scala.collection.mutable
+
+class AddressProcessingFunctionsTest extends TestBaseScala with
BeforeAndAfterEach {
+ private val logger = LoggerFactory.getLogger(getClass)
+ var clearedLibPostal = false
+
+ def clearLibpostalDataDir(): String = {
+ val dir = SedonaConf.fromActiveSession().getLibPostalDataDir
+ if (dir != null && dir.nonEmpty) {
+ try {
+ Files
+ .walk(Paths.get(dir))
+ .sorted(java.util.Comparator.reverseOrder())
+ .forEach(path => Files.deleteIfExists(path))
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Failed to clear libpostal data directory:
${e.getMessage}")
+ }
+ }
+ dir
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ if (!clearedLibPostal) {
+ clearedLibPostal = true
+ clearLibpostalDataDir()
+ }
+ }
+
+ describe("ExpandAddress") {
+ it("should return expected normalized forms") {
+ val resultDf = sparkSession
+ .sql(
+ "SELECT ExpandAddress('781 Franklin Ave Crown Heights Brooklyn NY
11216 USA') as normalized")
+ resultDf.write
+ .format("noop")
+ .mode("overwrite")
+ .save() // Tests a deserialization step that collect does not
+ val result = resultDf.collect
+ .take(1)(0)(0)
+ .asInstanceOf[mutable.Seq[String]]
+ assert(result.contains("781 franklin avenue crown heights brooklyn new
york 11216 usa"))
+ }
+
+ it("should return null for null input") {
+ val result = sparkSession.sql("SELECT ExpandAddress(NULL) as
normalized").collect()
+ assert(result.head.get(0) == null)
+ }
+
+ it("should should work when chained with explode") {
Review Comment:
Duplicate 'should' in the test description; it should read 'should work when
chained with explode'.
```suggestion
it("should work when chained with explode") {
```
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala:
##########
@@ -1831,3 +1835,140 @@ case class ST_InterpolatePoint(inputExpressions:
Seq[Expression])
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) =
copy(inputExpressions = newChildren)
}
+
+case class ExpandAddress(address: Expression)
+ extends UnaryExpression
+ with ImplicitCastInputTypes
+ with CodegenFallback
+ with FoldableExpression
+ with Serializable {
+
+ def this(children: Seq[Expression]) = this(children.head)
+
+ lazy private final val expander = {
+ val conf = SedonaConf.fromSparkEnv
+ getExpanderFromConf(conf.getLibPostalDataDir, conf.getLibPostalUseSenzing)
+ }
+
+ override def nullable: Boolean = true
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+
+ override def dataType: DataType = ArrayType(StringType)
+
+ override def eval(input: InternalRow): Any = {
+ val addressVal = address.eval(input)
+ if (addressVal == null) {
+ null
+ } else {
+ new GenericArrayData(
+ expander
+ .expandAddress(addressVal.asInstanceOf[UTF8String].toString)
+ .map(UTF8String.fromString))
+ }
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val expanderRef = ctx.addReferenceObj("expander", expander,
classOf[AddressExpander].getName)
+ val utf8StringClass = "org.apache.spark.unsafe.types.UTF8String"
+ val arrayDataClass = "org.apache.spark.sql.catalyst.util.GenericArrayData"
+ val addressRef = child.genCode(ctx)
+ val address = addressRef.value
+ ev.copy(code = code"""
+ ${addressRef.code}
+ boolean ${ev.isNull} = ${addressRef.isNull};
+ $arrayDataClass ${ev.value} = null;
+
+ if (!${ev.isNull}) {
+ String[] expandedAddressArray =
$expanderRef.expandAddress($address.toString());
+ $utf8StringClass[] utf8Strings = new
$utf8StringClass[expandedAddressArray.length];
+ for (int j = 0; j < expandedAddressArray.length; j++) {
+ utf8Strings[j] =
$utf8StringClass.fromString(expandedAddressArray[j]);
+ }
+ ${ev.value} = new $arrayDataClass(utf8Strings);
+ } else {
+ ${ev.value} = new $arrayDataClass(new $utf8StringClass[0]);
Review Comment:
In doGenCode the else branch returns an empty array for null inputs, but
eval returns null; this inconsistent null-handling can lead to incorrect
behavior under codegen. Update the generated code to set ev.isNull and return
null when the input is null.
##########
python/sedona/spark/sql/st_functions.py:
##########
@@ -32,6 +32,25 @@
_call_st_function = partial(call_sedona_function, "st_functions")
Review Comment:
[nitpick] Missing @validate_argument_types decorator on ExpandAddress for
input validation consistency; consider adding it as is done for ParseAddress.
```suggestion
@validate_argument_types
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]