This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/master by this push:
new 9e3bcad6a7 fix: yield tuple in GeneratorOperator example (#3657)
9e3bcad6a7 is described below
commit 9e3bcad6a7f94521be37120e2e8ad424025864b5
Author: Andy Zhang <[email protected]>
AuthorDate: Tue Aug 12 23:06:43 2025 -0700
fix: yield tuple in GeneratorOperator example (#3657)
There is an example Python UDF generator in the
[wiki](https://github.com/apache/texera/wiki/Guide-to-Use-a-Python-UDF#1-out-udf).
https://github.com/apache/texera/blob/12849accf7a1734ba0fd7feeabbf4df9e0bff812/core/amber/src/main/python/pytexera/udf/examples/generator_operator.py#L21-L24
However, this will raise an error `TypeError: Unmatched type for field
'test', expected AttributeType.INT, got [1, 2, 3] (<class 'list'>)
instead.`
```
2025-08-12 22:15:42.264 | ERROR |
core.runnables.data_processor:process_internal_marker:83 - Unmatched type for
field 'test', expected AttributeType.INT, got [1, 2, 3] (<class 'list'>)
instead.
Traceback (most recent call last):
File "/opt/anaconda3/envs/texera/lib/python3.10/threading.py", line 973,
in _bootstrap
self._bootstrap_inner()
│ └ <function Thread._bootstrap_inner at 0x1021c6a70>
└ <Thread(data_processor_thread, started daemon 13440872448)>
File "/opt/anaconda3/envs/texera/lib/python3.10/threading.py", line 1016,
in _bootstrap_inner
self.run()
│ └ <function Thread.run at 0x1021c67a0>
└ <Thread(data_processor_thread, started daemon 13440872448)>
File "/opt/anaconda3/envs/texera/lib/python3.10/threading.py", line 953,
in run
self._target(*self._args, **self._kwargs)
│ │ │ │ │ └ {}
│ │ │ │ └ <Thread(data_processor_thread, started
daemon 13440872448)>
│ │ │ └ ()
│ │ └ <Thread(data_processor_thread, started daemon
13440872448)>
│ └ <bound method DataProcessor.run of
<core.runnables.data_processor.DataProcessor object at 0x16b1f2500>>
└ <Thread(data_processor_thread, started daemon 13440872448)>
File
"/Users/andy/Documents/Projects/texera/core/amber/src/main/python/core/runnables/data_processor.py",
line 58, in run
self.process_internal_marker(marker)
│ │ └ <core.models.internal_marker.EndChannel
object at 0x16da2cd60>
│ └ <function DataProcessor.process_internal_marker at 0x16b18cb80>
└ <core.runnables.data_processor.DataProcessor object at 0x16b1f2500>
> File
"/Users/andy/Documents/Projects/texera/core/amber/src/main/python/core/runnables/data_processor.py",
line 80, in process_internal_marker
self._set_output_tuple(executor.on_finish(port_id))
│ │ │ │ └ 0
│ │ │ └ <function SourceOperator.on_finish at
0x1679b8c10>
│ │ └ <udf-v1.GeneratorOperator object at
0x16b1f3f40>
│ └ <function DataProcessor._set_output_tuple at 0x16b18cd30>
└ <core.runnables.data_processor.DataProcessor object at 0x16b1f2500>
File
"/Users/andy/Documents/Projects/texera/core/amber/src/main/python/core/runnables/data_processor.py",
line 147, in _set_output_tuple
output_tuple.finalize(
│ └ <function Tuple.finalize at 0x1679ae680>
└ Tuple['test': [1, 2, 3]]
File
"/Users/andy/Documents/Projects/texera/core/amber/src/main/python/core/models/tuple.py",
line 263, in finalize
self.validate_schema(schema)
│ │ └ <core.models.schema.schema.Schema object at
0x16d3925f0>
│ └ <function Tuple.validate_schema at 0x1679ae7a0>
└ Tuple['test': [1, 2, 3]]
File
"/Users/andy/Documents/Projects/texera/core/amber/src/main/python/core/models/tuple.py",
line 329, in validate_schema
raise TypeError(
TypeError: Unmatched type for field 'test', expected AttributeType.INT, got
[1, 2, 3] (<class 'list'>) instead.
```
This example should be written as
```
class GeneratorOperator(UDFSourceOperator):
@overrides
def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
for i in [1, 2, 3]:
yield {"test": i}
```
<img width="3054" height="1690" alt="image"
src="https://github.com/user-attachments/assets/7bcf37f0-1ca4-4ddb-89a5-7c5cb2594eb6"
/>
---
.../src/main/python/pytexera/udf/examples/generator_operator.py | 3 ++-
.../src/main/python/pytexera/udf/examples/test_generator_operator.py | 5 +++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/core/amber/src/main/python/pytexera/udf/examples/generator_operator.py
b/core/amber/src/main/python/pytexera/udf/examples/generator_operator.py
index 995060dee4..df6bd8df3f 100644
--- a/core/amber/src/main/python/pytexera/udf/examples/generator_operator.py
+++ b/core/amber/src/main/python/pytexera/udf/examples/generator_operator.py
@@ -21,4 +21,5 @@ from pytexera import *
class GeneratorOperator(UDFSourceOperator):
@overrides
def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
- yield {"test": [1, 2, 3]}
+ for i in [1, 2, 3]:
+ yield {"test": i}
diff --git
a/core/amber/src/main/python/pytexera/udf/examples/test_generator_operator.py
b/core/amber/src/main/python/pytexera/udf/examples/test_generator_operator.py
index 931d8fefb8..85e9404d59 100644
---
a/core/amber/src/main/python/pytexera/udf/examples/test_generator_operator.py
+++
b/core/amber/src/main/python/pytexera/udf/examples/test_generator_operator.py
@@ -29,6 +29,7 @@ class TestEchoOperator:
def test_generator_operator(self, generator_operator):
generator_operator.open()
outputs = generator_operator.produce()
- output_tuple = Tuple(next(outputs))
- assert output_tuple == Tuple({"test": [1, 2, 3]})
+ for i in [1, 2, 3]:
+ output_tuple = Tuple(next(outputs))
+ assert output_tuple == Tuple({"test": i})
generator_operator.close()