This is an automated email from the ASF dual-hosted git repository.
imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/arrow-worker by this push:
new 3c16cf4915 add sedonadb sedona udf worker example
3c16cf4915 is described below
commit 3c16cf491526eab12f2371017dadd22070b20ad3
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Jan 4 22:39:13 2026 +0100
add sedonadb sedona udf worker example
---
python/sedona/spark/worker/serde.py | 27 ---------------------------
python/sedona/spark/worker/worker.py | 25 -------------------------
2 files changed, 52 deletions(-)
diff --git a/python/sedona/spark/worker/serde.py
b/python/sedona/spark/worker/serde.py
index 31038b7fcd..3954d075b7 100644
--- a/python/sedona/spark/worker/serde.py
+++ b/python/sedona/spark/worker/serde.py
@@ -1,29 +1,8 @@
-import socket
-
from pyspark.serializers import write_int, SpecialLengths
from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
from sedona.spark.worker.udf_info import UDFInfo
-
-def read_available(buf, chunk=4096):
- # buf.raw._sock.settimeout(0.01) # non-blocking-ish
- data = bytearray()
- index = 0
- while True:
- index+=1
- try:
- chunk_bytes = buf.read(chunk)
- except socket.timeout:
- break
-
- if not chunk_bytes and index > 10:
- break
-
- data.extend(chunk_bytes)
-
- return bytes(data)
-
class SedonaDBSerializer(ArrowStreamPandasSerializer):
def __init__(self, timezone, safecheck, db, udf_info: UDFInfo):
super(SedonaDBSerializer, self).__init__(timezone, safecheck)
@@ -64,12 +43,6 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer):
writer.close()
def dump_stream(self, iterator, stream):
- """
- Override because Pandas UDFs require a START_ARROW_STREAM before the
Arrow stream is sent.
- This should be sent after creating the first record batch so in case
of an error, it can
- be sent back to the JVM before the Arrow stream starts.
- """
-
def init_stream_yield_batches():
should_write_start_length = True
for batch in iterator:
diff --git a/python/sedona/spark/worker/worker.py
b/python/sedona/spark/worker/worker.py
index 74a61b02ee..571134f407 100644
--- a/python/sedona/spark/worker/worker.py
+++ b/python/sedona/spark/worker/worker.py
@@ -104,7 +104,6 @@ def resolve_python_path(utf_serde: UTF8Deserializer,
infile):
sys.path.insert(1, path)
spark_files_dir = utf_serde.loads(infile)
- # _accumulatorRegistry.clear()
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
@@ -165,26 +164,6 @@ def read_udf(infile, pickle_ser) -> UDFInfo:
geom_offsets=[0]
)
-# def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
-# num_arg = read_int(infile)
-# arg_offsets = [read_int(infile) for i in range(num_arg)]
-# chained_func = None
-# for i in range(read_int(infile)):
-# f, return_type = read_command(pickleSer, infile)
-# if chained_func is None:
-# chained_func = f
-# else:
-# chained_func = chain(chained_func, f)
-#
-# func = chained_func
-#
-# # the last returnType will be the return type of UDF
-# if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-# return arg_offsets, func, return_type
-# else:
-# raise ValueError("Unknown eval type: {}".format(eval_type))
-#
-
def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
num_udfs = read_int(infile)
@@ -211,11 +190,7 @@ def write_statistics(infile, outfile, boot_time,
init_time) -> None:
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)
- # Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
- # write_int(len(_accumulatorRegistry), outfile)
- # for (aid, accum) in _accumulatorRegistry.items():
- # pickleSer._write_with_length((aid, accum._value), outfile)
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)