This is an automated email from the ASF dual-hosted git repository.
chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8d519d8ed0 feat(amber): Enable R UDF Runtime via Optional texera-rudf
Plugin (#4164)
8d519d8ed0 is described below
commit 8d519d8ed0afda5e2f992ad48e98ea5719c6d6b9
Author: Chris <[email protected]>
AuthorDate: Mon Jan 19 20:08:47 2026 -0800
feat(amber): Enable R UDF Runtime via Optional texera-rudf Plugin (#4164)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR integrates the `texera-rudf` plugin and enables R UDF operators
when the plugin is installed in the runtime environment.
Previously, #4090 removed runtime support for R UDF operators due to
licensing constraints. Following the design proposed in #4155, we now
manage all R UDF–related code in a separate repository and have the
Texera main repository consume it as an optional plugin.
For prerequisites and installation instructions, please refer to the
`README.md` in the plugin repository:
https://github.com/kunwp1/texera-rudf.
Note: This repository is currently a prototype under my personal
account. Once this PR is merged, I plan to transfer it to the Texera
organization.
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Discussion: #4155
PR: #4090, #4124
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Tested with this workflow
[Test.json](https://github.com/user-attachments/files/24662090/Test.json)
#### Plugin Uninstalled
<img width="1728" height="1117" alt="Screenshot 2026-01-15 at 9 27
17 PM"
src="https://github.com/user-attachments/assets/838ddfbc-507d-4961-8edf-8f6f2941effd"
/>
#### Plugin Installed
<img width="1728" height="1117" alt="Screenshot 2026-01-15 at 9 29
51 PM"
src="https://github.com/user-attachments/assets/57074c3a-fac4-4b3c-9578-9038e066d18d"
/>
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No
---
.../core/architecture/managers/executor_manager.py | 26 ++++--
.../architecture/managers/test_executor_manager.py | 92 +++++++++++++++++++---
2 files changed, 102 insertions(+), 16 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/managers/executor_manager.py
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index 53e5a8903d..eb1363d0a6 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -132,13 +132,25 @@ class ExecutorManager:
:param language: The language of the operator code.
:return:
"""
- assert language not in [
- "r-tuple",
- "r-table",
- ], "R language is not supported by default. Please consult third party
plugin."
- executor: type(Operator) = self.load_executor_definition(code)
- self.executor = executor()
- self.executor.is_source = is_source
+ if language in ("r-tuple", "r-table"):
+ # R support is provided by an optional plugin (texera-rudf)
+ executor_type = "Tuple" if language == "r-tuple" else "Table"
+ try:
+ import texera_r
+
+ class_suffix = "SourceExecutor" if is_source else "Executor"
+ executor_class = getattr(texera_r,
f"R{executor_type}{class_suffix}")
+ except ImportError as e:
+ raise ImportError(
+ "R operators require the texera-rudf package.\n"
+ "Install with: pip install
git+https://github.com/Texera/texera-rudf.git\n"
+ f"Import error: {e}"
+ )
+ self.executor = executor_class(code)
+ else:
+ executor: type(Operator) = self.load_executor_definition(code)
+ self.executor = executor()
+ self.executor.is_source = is_source
assert isinstance(self.executor, SourceOperator) ==
self.executor.is_source, (
"Please use SourceOperator API for source operators."
)
diff --git
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index 7728c509b4..901f768a21 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+import sys
import pytest
+from unittest.mock import MagicMock
from core.architecture.managers.executor_manager import ExecutorManager
@@ -39,7 +41,7 @@ class TestSourceOperator(UDFSourceOperator):
class TestExecutorManager:
- """Test suite for ExecutorManager, focusing on R UDF support removal."""
+ """Test suite for ExecutorManager, focusing on R UDF plugin support."""
@pytest.fixture
def executor_manager(self):
@@ -50,6 +52,34 @@ class TestExecutorManager:
if hasattr(manager, "_fs"):
manager.close()
+ def _mock_r_plugin(self, executor_class_name, is_source):
+ """
+ Helper to mock the texera_r plugin module.
+
+ :param executor_class_name: Name of the executor class (e.g.,
'RTupleExecutor')
+ :param is_source: Whether the executor is a source operator
+ :return: Tuple of (mock_texera_r, mock_executor_instance)
+ """
+ from core.models import SourceOperator, Operator
+
+ mock_texera_r = MagicMock()
+ mock_executor_class = MagicMock()
+ setattr(mock_texera_r, executor_class_name, mock_executor_class)
+
+ # Use appropriate spec based on operator type
+ spec_class = SourceOperator if is_source else Operator
+ mock_executor_instance = MagicMock(spec=spec_class)
+ mock_executor_instance.is_source = is_source
+ mock_executor_class.return_value = mock_executor_instance
+
+ sys.modules["texera_r"] = mock_texera_r
+ return mock_texera_r, mock_executor_instance
+
+ def _cleanup_r_plugin(self):
+ """Remove the mocked texera_r module from sys.modules."""
+ if "texera_r" in sys.modules:
+ del sys.modules["texera_r"]
+
def test_initialization(self, executor_manager):
"""Test that ExecutorManager initializes correctly."""
assert executor_manager.executor is None
@@ -57,29 +87,73 @@ class TestExecutorManager:
assert executor_manager.executor_version == 0
def test_reject_r_tuple_language(self, executor_manager):
- """Test that 'r-tuple' language is rejected with AssertionError."""
- with pytest.raises(AssertionError) as exc_info:
+ """Test that 'r-tuple' language is rejected with ImportError when
plugin is not available."""
+ with pytest.raises(ImportError) as exc_info:
executor_manager.initialize_executor(
code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple"
)
- # Verify the error message mentions R UDF support has been dropped
- assert "not supported" in str(exc_info.value) or "dropped" in str(
+ # Verify the error message mentions R operators require the
texera-rudf package
+ assert "texera-rudf" in str(exc_info.value) or "R operators require"
in str(
exc_info.value
)
def test_reject_r_table_language(self, executor_manager):
- """Test that 'r-table' language is rejected with AssertionError."""
- with pytest.raises(AssertionError) as exc_info:
+ """Test that 'r-table' language is rejected with ImportError when
plugin is not available."""
+ with pytest.raises(ImportError) as exc_info:
executor_manager.initialize_executor(
code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table"
)
- # Verify the error message mentions R UDF support has been dropped
- assert "not supported" in str(exc_info.value) or "dropped" in str(
+ # Verify the error message mentions R operators require the
texera-rudf package
+ assert "texera-rudf" in str(exc_info.value) or "R operators require"
in str(
exc_info.value
)
+ def test_accept_r_tuple_language_with_plugin(self, executor_manager):
+ """Test that 'r-tuple' language is accepted when plugin is
available."""
+ _, mock_executor = self._mock_r_plugin("RTupleExecutor",
is_source=False)
+ try:
+ executor_manager.initialize_executor(
+ code="# R code", is_source=False, language="r-tuple"
+ )
+ assert executor_manager.executor == mock_executor
+ finally:
+ self._cleanup_r_plugin()
+
+ def test_accept_r_table_language_with_plugin(self, executor_manager):
+ """Test that 'r-table' language is accepted when plugin is
available."""
+ _, mock_executor = self._mock_r_plugin("RTableExecutor",
is_source=False)
+ try:
+ executor_manager.initialize_executor(
+ code="# R code", is_source=False, language="r-table"
+ )
+ assert executor_manager.executor == mock_executor
+ finally:
+ self._cleanup_r_plugin()
+
+ def test_accept_r_tuple_source_with_plugin(self, executor_manager):
+ """Test that 'r-tuple' source operators work when plugin is
available."""
+ _, mock_executor = self._mock_r_plugin("RTupleSourceExecutor",
is_source=True)
+ try:
+ executor_manager.initialize_executor(
+ code="# R code", is_source=True, language="r-tuple"
+ )
+ assert executor_manager.executor == mock_executor
+ finally:
+ self._cleanup_r_plugin()
+
+ def test_accept_r_table_source_with_plugin(self, executor_manager):
+ """Test that 'r-table' source operators work when plugin is
available."""
+ _, mock_executor = self._mock_r_plugin("RTableSourceExecutor",
is_source=True)
+ try:
+ executor_manager.initialize_executor(
+ code="# R code", is_source=True, language="r-table"
+ )
+ assert executor_manager.executor == mock_executor
+ finally:
+ self._cleanup_r_plugin()
+
def test_accept_python_language_regular_operator(self, executor_manager):
"""Test that 'python' language is accepted for regular operators."""
# This should not raise any assertion error