This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch add-sedona-worker-daemon-mode in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 45c2fd64ca1d18b1947e988a346bac793333f592 Author: pawelkocinski <[email protected]> AuthorDate: Sun Jan 4 22:39:13 2026 +0100 add sedonadb sedona udf worker example --- python/sedona/spark/worker/serde.py | 27 --------------------------- python/sedona/spark/worker/worker.py | 25 ------------------------- 2 files changed, 52 deletions(-) diff --git a/python/sedona/spark/worker/serde.py b/python/sedona/spark/worker/serde.py index 31038b7fcd..3954d075b7 100644 --- a/python/sedona/spark/worker/serde.py +++ b/python/sedona/spark/worker/serde.py @@ -1,29 +1,8 @@ -import socket - from pyspark.serializers import write_int, SpecialLengths from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer from sedona.spark.worker.udf_info import UDFInfo - -def read_available(buf, chunk=4096): - # buf.raw._sock.settimeout(0.01) # non-blocking-ish - data = bytearray() - index = 0 - while True: - index+=1 - try: - chunk_bytes = buf.read(chunk) - except socket.timeout: - break - - if not chunk_bytes and index > 10: - break - - data.extend(chunk_bytes) - - return bytes(data) - class SedonaDBSerializer(ArrowStreamPandasSerializer): def __init__(self, timezone, safecheck, db, udf_info: UDFInfo): super(SedonaDBSerializer, self).__init__(timezone, safecheck) @@ -64,12 +43,6 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer): writer.close() def dump_stream(self, iterator, stream): - """ - Override because Pandas UDFs require a START_ARROW_STREAM before the Arrow stream is sent. - This should be sent after creating the first record batch so in case of an error, it can - be sent back to the JVM before the Arrow stream starts. - """ - def init_stream_yield_batches(): should_write_start_length = True for batch in iterator: diff --git a/python/sedona/spark/worker/worker.py b/python/sedona/spark/worker/worker.py index 74a61b02ee..571134f407 100644 --- a/python/sedona/spark/worker/worker.py +++ b/python/sedona/spark/worker/worker.py @@ -104,7 +104,6 @@ def resolve_python_path(utf_serde: UTF8Deserializer, infile): sys.path.insert(1, path) spark_files_dir = utf_serde.loads(infile) - # _accumulatorRegistry.clear() SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True @@ -165,26 +164,6 @@ def read_udf(infile, pickle_ser) -> UDFInfo: geom_offsets=[0] ) -# def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): -# num_arg = read_int(infile) -# arg_offsets = [read_int(infile) for i in range(num_arg)] -# chained_func = None -# for i in range(read_int(infile)): -# f, return_type = read_command(pickleSer, infile) -# if chained_func is None: -# chained_func = f -# else: -# chained_func = chain(chained_func, f) -# -# func = chained_func -# -# # the last returnType will be the return type of UDF -# if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: -# return arg_offsets, func, return_type -# else: -# raise ValueError("Unknown eval type: {}".format(eval_type)) -# - def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo: num_udfs = read_int(infile) @@ -211,11 +190,7 @@ def write_statistics(infile, outfile, boot_time, init_time) -> None: write_long(shuffle.MemoryBytesSpilled, outfile) write_long(shuffle.DiskBytesSpilled, outfile) - # Mark the beginning of the accumulators section of the output write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) - # write_int(len(_accumulatorRegistry), outfile) - # for (aid, accum) in _accumulatorRegistry.items(): - # pickleSer._write_with_length((aid, accum._value), outfile) if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile)
