This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new d5bc8b7cf8 refactor: untrack generated python proto, regenerate in
build (#5077)
d5bc8b7cf8 is described below
commit d5bc8b7cf8d2e515280934a9c3e52c305fe3c002
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 26 20:39:27 2026 -0700
refactor: untrack generated python proto, regenerate in build (#5077)
### What changes were proposed in this PR?
Stop version-tracking the betterproto-generated Python files under
`amber/src/main/python/proto/`. Add a `genPythonProto` sbt task that
runs `bin/python-proto-gen.sh` as a dependency of
`WorkflowExecutionService/Compile/compile`, with a graceful skip when
`protoc` is missing. Wire `protoc` 3.19.4 (matching `PB.protocVersion`)
and `betterproto[compiler]==2.0.0b7` into the three image build stages
(worker,master, web-app) and the three CI jobs that touch Python proto
(`python`, `python-state-materialization-mac`, `amber-integration`). The
14th tracked file (`proto/org/apache/texera/web/__init__.py`) had no
corresponding `.proto` source and is gone after regen; grep confirms no
callers.
### Any related issues, documentation, or discussions?
Closes: #4102
### How was this PR tested?
locally: sbt clean WorkflowExecutionService/compile regenerates the tree
and pytest -m "not integration" passes; skip path verified with protoc
absent. All three Docker images build end-to-end. CI covers the workflow
YAML on
push.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---
.github/workflows/build.yml | 30 +
.gitignore | 3 +
AGENTS.md | 2 +
amber/build.sbt | 2 +-
amber/dev-requirements.txt | 4 +
amber/src/main/python/proto/__init__.py | 0
amber/src/main/python/proto/org/__init__.py | 0
amber/src/main/python/proto/org/apache/__init__.py | 0
.../python/proto/org/apache/texera/__init__.py | 0
.../proto/org/apache/texera/amber/__init__.py | 0
.../proto/org/apache/texera/amber/core/__init__.py | 146 --
.../org/apache/texera/amber/engine/__init__.py | 0
.../texera/amber/engine/architecture/__init__.py | 0
.../amber/engine/architecture/rpc/__init__.py | 2204 --------------------
.../engine/architecture/sendsemantics/__init__.py | 66 -
.../amber/engine/architecture/worker/__init__.py | 49 -
.../apache/texera/amber/engine/common/__init__.py | 156 --
.../python/proto/org/apache/texera/web/__init__.py | 158 --
amber/src/main/python/proto/scalapb/__init__.py | 421 ----
bin/computing-unit-master.dockerfile | 21 +-
bin/computing-unit-worker.dockerfile | 21 +-
bin/protoc-version.txt | 1 +
bin/python-proto-gen.sh | 12 +-
bin/texera-web-application.dockerfile | 22 +-
24 files changed, 108 insertions(+), 3210 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 4fd056c7a7..2b93c55576 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -355,11 +355,21 @@ jobs:
# mirrors a subset of common deps (e.g. pillow); without this
# flag a dependabot bump to a version not yet mirrored there
# fails to resolve even though PyPI has it.
+ # dev-requirements.txt provides the betterproto plugin used by
+ # bin/python-proto-gen.sh.
run: |
python -m pip install uv
if [ -f amber/requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
if [ -f amber/operator-requirements.txt ]; then uv pip install
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt;
fi
if [ -f amber/dev-requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
+ - name: Install protoc
+ # Version pinned in bin/protoc-version.txt.
+ run: |
+ PROTOC_VERSION=$(cat bin/protoc-version.txt)
+ curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
+ sudo unzip -o /tmp/protoc.zip -d /usr/local
+ sudo chmod +x /usr/local/bin/protoc
+ sudo chmod -R a+rX /usr/local/include/google
- name: Create Databases
run: |
psql -h localhost -U postgres -f sql/texera_ddl.sql
@@ -467,6 +477,10 @@ jobs:
}
}
EOF
+ - name: Generate Python proto bindings
+ # Integration specs spawn Python UDF workers that import the
+ # generated betterproto bindings. Independent of sbt and the JDK.
+ run: bash bin/python-proto-gen.sh
- name: Lint and run amber integration tests
# AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
# keep only @org.apache.texera.amber.tags.IntegrationTest
@@ -685,6 +699,16 @@ jobs:
run: |
python -m pip install uv
if [ -f amber/dev-requirements.txt ]; then uv pip install --system
-r amber/dev-requirements.txt; fi
+ - name: Install protoc
+ # Version pinned in bin/protoc-version.txt.
+ run: |
+ PROTOC_VERSION=$(cat bin/protoc-version.txt)
+ curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
+ sudo unzip -o /tmp/protoc.zip -d /usr/local
+ sudo chmod +x /usr/local/bin/protoc
+ sudo chmod -R a+rX /usr/local/include/google
+ - name: Generate Python proto bindings
+ run: bash bin/python-proto-gen.sh
- name: Test with pytest
# --junit-xml emits a JUnit-XML report alongside the coverage XML
# so the Test Analytics upload below can feed Codecov's failing-
@@ -745,6 +769,12 @@ jobs:
if [ -f amber/requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
if [ -f amber/operator-requirements.txt ]; then uv pip install
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt;
fi
if [ -f amber/dev-requirements.txt ]; then uv pip install --system
-r amber/dev-requirements.txt; fi
+ - name: Install protoc
+ # Homebrew protoc; this job doesn't exercise scalapb so the
+ # bin/protoc-version.txt pin doesn't apply here.
+ run: brew install protobuf
+ - name: Generate Python proto bindings
+ run: bash bin/python-proto-gen.sh
- name: Run state-materialization integration tests
run: |
cd amber && pytest -sv \
diff --git a/.gitignore b/.gitignore
index d17fe084cf..6e772fee5a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,6 +60,9 @@ coverage.xml
*.model
*.pkl
+# Regenerated by bin/python-proto-gen.sh.
+amber/src/main/python/proto/
+
# Ingoring user generated resources
user-resources/
diff --git a/AGENTS.md b/AGENTS.md
index ba40082d59..eef0aea460 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -83,6 +83,8 @@ One Python venv shared across worktrees, sibling of the
texera checkout:
```bash
python3.12 -m venv ../venv312 && source ../venv312/bin/activate
pip install -r amber/requirements.txt -r amber/operator-requirements.txt
+# For pytest or running bin/python-proto-gen.sh, also install dev deps:
+pip install -r amber/dev-requirements.txt
```
Tests that spawn Python workers need an interpreter path. Edit `python.path`
diff --git a/amber/build.sbt b/amber/build.sbt
index 1f363e73e9..dc9b5d8f30 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -179,7 +179,7 @@ libraryDependencies ++= hadoopDependencies
// protobuf related
// run the following with sbt to have protobuf codegen
-PB.protocVersion := "3.19.4"
+PB.protocVersion := IO.read((ThisBuild / baseDirectory).value / "bin" /
"protoc-version.txt").trim
enablePlugins(Fs2Grpc)
diff --git a/amber/dev-requirements.txt b/amber/dev-requirements.txt
index 1bbacb78d6..7ae68cf4a2 100644
--- a/amber/dev-requirements.txt
+++ b/amber/dev-requirements.txt
@@ -23,3 +23,7 @@
# Coverage instrumentation for pytest; emits coverage.xml consumed by
# Codecov's Phase 1 upload.
pytest-cov==5.0.0
+
+# protoc plugin for bin/python-proto-gen.sh; runtime needs only the
+# base `betterproto` in requirements.txt.
+betterproto[compiler]==2.0.0b7
diff --git a/amber/src/main/python/proto/__init__.py
b/amber/src/main/python/proto/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/__init__.py
b/amber/src/main/python/proto/org/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/__init__.py
b/amber/src/main/python/proto/org/apache/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/texera/__init__.py
b/amber/src/main/python/proto/org/apache/texera/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/texera/amber/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py
deleted file mode 100644
index 2d21638c26..0000000000
--- a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources: org/apache/texera/amber/core/executor.proto,
org/apache/texera/amber/core/virtualidentity.proto,
org/apache/texera/amber/core/workflow.proto,
org/apache/texera/amber/core/workflowruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
- List,
-)
-
-import betterproto
-
-
-class OutputPortOutputMode(betterproto.Enum):
- SET_SNAPSHOT = 0
- """outputs complete result set snapshot for each update"""
-
- SET_DELTA = 1
- """outputs incremental result set delta for each update"""
-
- SINGLE_SNAPSHOT = 2
- """
- outputs a single snapshot for the entire execution,
- used explicitly to support visualization operators that may exceed the
memory limit
- TODO: remove this mode after we have a better solution for output size
limit
- """
-
-
-class FatalErrorType(betterproto.Enum):
- COMPILATION_ERROR = 0
- EXECUTION_FAILURE = 1
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowIdentity(betterproto.Message):
- id: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionIdentity(betterproto.Message):
- id: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ActorVirtualIdentity(betterproto.Message):
- name: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ChannelIdentity(betterproto.Message):
- from_worker_id: "ActorVirtualIdentity" = betterproto.message_field(1)
- to_worker_id: "ActorVirtualIdentity" = betterproto.message_field(2)
- is_control: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorIdentity(betterproto.Message):
- id: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PhysicalOpIdentity(betterproto.Message):
- logical_op_id: "OperatorIdentity" = betterproto.message_field(1)
- layer_name: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EmbeddedControlMessageIdentity(betterproto.Message):
- id: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PortIdentity(betterproto.Message):
- id: int = betterproto.int32_field(1)
- internal: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class GlobalPortIdentity(betterproto.Message):
- op_id: "PhysicalOpIdentity" = betterproto.message_field(1)
- port_id: "PortIdentity" = betterproto.message_field(2)
- input: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class InputPort(betterproto.Message):
- id: "PortIdentity" = betterproto.message_field(1)
- display_name: str = betterproto.string_field(2)
- disallow_multi_links: bool = betterproto.bool_field(3)
- dependencies: List["PortIdentity"] = betterproto.message_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class OutputPort(betterproto.Message):
- id: "PortIdentity" = betterproto.message_field(1)
- display_name: str = betterproto.string_field(2)
- blocking: bool = betterproto.bool_field(3)
- mode: "OutputPortOutputMode" = betterproto.enum_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class PhysicalLink(betterproto.Message):
- from_op_id: "PhysicalOpIdentity" = betterproto.message_field(1)
- from_port_id: "PortIdentity" = betterproto.message_field(2)
- to_op_id: "PhysicalOpIdentity" = betterproto.message_field(3)
- to_port_id: "PortIdentity" = betterproto.message_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecWithCode(betterproto.Message):
- code: str = betterproto.string_field(1)
- language: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecWithClassName(betterproto.Message):
- class_name: str = betterproto.string_field(1)
- desc_string: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecSource(betterproto.Message):
- storage_key: str = betterproto.string_field(1)
- workflow_identity: "WorkflowIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecInitInfo(betterproto.Message):
- op_exec_with_class_name: "OpExecWithClassName" = betterproto.message_field(
- 1, group="sealed_value"
- )
- op_exec_with_code: "OpExecWithCode" = betterproto.message_field(
- 2, group="sealed_value"
- )
- op_exec_source: "OpExecSource" = betterproto.message_field(3,
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowFatalError(betterproto.Message):
- type: "FatalErrorType" = betterproto.enum_field(1)
- timestamp: datetime = betterproto.message_field(2)
- message: str = betterproto.string_field(3)
- details: str = betterproto.string_field(4)
- operator_id: str = betterproto.string_field(5)
- worker_id: str = betterproto.string_field(6)
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
deleted file mode 100644
index 2bad2b0bfb..0000000000
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ /dev/null
@@ -1,2204 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources:
org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto,
org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto,
org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto,
org/apache/texera/amber/engine/architecture/rpc/testerservice.proto,
org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
- TYPE_CHECKING,
- Dict,
- List,
- Optional,
-)
-
-import betterproto
-import grpclib
-from betterproto.grpc.grpclib_server import ServiceBase
-
-from .... import core as ___core__
-from .. import (
- sendsemantics as _sendsemantics__,
- worker as _worker__,
-)
-
-
-if TYPE_CHECKING:
- import grpclib.server
- from betterproto.grpc.grpclib_client import MetadataLike
- from grpclib.metadata import Deadline
-
-
-class EmbeddedControlMessageType(betterproto.Enum):
- ALL_ALIGNMENT = 0
- NO_ALIGNMENT = 1
- PORT_ALIGNMENT = 2
-
-
-class ConsoleMessageType(betterproto.Enum):
- PRINT = 0
- ERROR = 1
- COMMAND = 2
- DEBUGGER = 3
-
-
-class StatisticsUpdateTarget(betterproto.Enum):
- BOTH_UI_AND_PERSISTENCE = 0
- UI_ONLY = 1
- PERSISTENCE_ONLY = 2
-
-
-class ErrorLanguage(betterproto.Enum):
- PYTHON = 0
- SCALA = 1
-
-
-class WorkflowAggregatedState(betterproto.Enum):
- UNINITIALIZED = 0
- READY = 1
- RUNNING = 2
- PAUSING = 3
- PAUSED = 4
- RESUMING = 5
- COMPLETED = 6
- FAILED = 7
- UNKNOWN = 8
- KILLED = 9
- TERMINATED = 10
-
-
-@dataclass(eq=False, repr=False)
-class ControlRequest(betterproto.Message):
- propagate_embedded_control_message_request: (
- "PropagateEmbeddedControlMessageRequest"
- ) = betterproto.message_field(1, group="sealed_value")
- """request for controller"""
-
- take_global_checkpoint_request: "TakeGlobalCheckpointRequest" = (
- betterproto.message_field(2, group="sealed_value")
- )
- debug_command_request: "DebugCommandRequest" = betterproto.message_field(
- 3, group="sealed_value"
- )
- evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = (
- betterproto.message_field(4, group="sealed_value")
- )
- retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field(
- 5, group="sealed_value"
- )
- console_message_triggered_request: "ConsoleMessageTriggeredRequest" = (
- betterproto.message_field(6, group="sealed_value")
- )
- port_completed_request: "PortCompletedRequest" = betterproto.message_field(
- 7, group="sealed_value"
- )
- worker_state_updated_request: "WorkerStateUpdatedRequest" = (
- betterproto.message_field(8, group="sealed_value")
- )
- link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
- 9, group="sealed_value"
- )
- workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
- betterproto.message_field(10, group="sealed_value")
- )
- jump_to_operator_region_request: "JumpToOperatorRegionRequest" =
betterproto.message_field(
- 11, group="sealed_value"
- )
- add_input_channel_request: "AddInputChannelRequest" =
betterproto.message_field(
- 50, group="sealed_value"
- )
- """request for worker"""
-
- add_partitioning_request: "AddPartitioningRequest" =
betterproto.message_field(
- 51, group="sealed_value"
- )
- assign_port_request: "AssignPortRequest" = betterproto.message_field(
- 52, group="sealed_value"
- )
- finalize_checkpoint_request: "FinalizeCheckpointRequest" = (
- betterproto.message_field(53, group="sealed_value")
- )
- initialize_executor_request: "InitializeExecutorRequest" = (
- betterproto.message_field(54, group="sealed_value")
- )
- update_executor_request: "UpdateExecutorRequest" =
betterproto.message_field(
- 55, group="sealed_value"
- )
- empty_request: "EmptyRequest" = betterproto.message_field(56,
group="sealed_value")
- prepare_checkpoint_request: "PrepareCheckpointRequest" =
betterproto.message_field(
- 57, group="sealed_value"
- )
- query_statistics_request: "QueryStatisticsRequest" =
betterproto.message_field(
- 58, group="sealed_value"
- )
- ping: "Ping" = betterproto.message_field(100, group="sealed_value")
- """request for testing"""
-
- pong: "Pong" = betterproto.message_field(101, group="sealed_value")
- nested: "Nested" = betterproto.message_field(102, group="sealed_value")
- pass_: "Pass" = betterproto.message_field(103, group="sealed_value")
- error_command: "ErrorCommand" = betterproto.message_field(104,
group="sealed_value")
- recursion: "Recursion" = betterproto.message_field(105,
group="sealed_value")
- collect: "Collect" = betterproto.message_field(106, group="sealed_value")
- generate_number: "GenerateNumber" = betterproto.message_field(
- 107, group="sealed_value"
- )
- multi_call: "MultiCall" = betterproto.message_field(108,
group="sealed_value")
- chain: "Chain" = betterproto.message_field(109, group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class EmptyRequest(betterproto.Message):
- pass
-
-
-@dataclass(eq=False, repr=False)
-class AsyncRpcContext(betterproto.Message):
- sender: "___core__.ActorVirtualIdentity" = betterproto.message_field(1)
- receiver: "___core__.ActorVirtualIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ControlInvocation(betterproto.Message):
- method_name: str = betterproto.string_field(1)
- command: "ControlRequest" = betterproto.message_field(2)
- context: "AsyncRpcContext" = betterproto.message_field(3)
- command_id: int = betterproto.int64_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class EmbeddedControlMessage(betterproto.Message):
- id: "___core__.EmbeddedControlMessageIdentity" =
betterproto.message_field(1)
- ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(2)
- scope: List["___core__.ChannelIdentity"] = betterproto.message_field(3)
- command_mapping: Dict[str, "ControlInvocation"] = betterproto.map_field(
- 4, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class PropagateEmbeddedControlMessageRequest(betterproto.Message):
- source_op_to_start_prop: List["___core__.PhysicalOpIdentity"] = (
- betterproto.message_field(1)
- )
- id: "___core__.EmbeddedControlMessageIdentity" =
betterproto.message_field(2)
- ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(3)
- scope: List["___core__.PhysicalOpIdentity"] = betterproto.message_field(4)
- target_ops: List["___core__.PhysicalOpIdentity"] =
betterproto.message_field(5)
- command: "ControlRequest" = betterproto.message_field(6)
- method_name: str = betterproto.string_field(7)
-
-
-@dataclass(eq=False, repr=False)
-class TakeGlobalCheckpointRequest(betterproto.Message):
- estimation_only: bool = betterproto.bool_field(1)
- checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
- betterproto.message_field(2)
- )
- destination: str = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowReconfigureRequest(betterproto.Message):
- reconfiguration: List["UpdateExecutorRequest"] =
betterproto.message_field(1)
- reconfiguration_id: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class DebugCommandRequest(betterproto.Message):
- worker_id: str = betterproto.string_field(1)
- cmd: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatePythonExpressionRequest(betterproto.Message):
- expression: str = betterproto.string_field(1)
- operator_id: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class RetryWorkflowRequest(betterproto.Message):
- workers: List["___core__.ActorVirtualIdentity"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ConsoleMessage(betterproto.Message):
- worker_id: str = betterproto.string_field(1)
- timestamp: datetime = betterproto.message_field(2)
- msg_type: "ConsoleMessageType" = betterproto.enum_field(3)
- source: str = betterproto.string_field(4)
- title: str = betterproto.string_field(5)
- message: str = betterproto.string_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class ConsoleMessageTriggeredRequest(betterproto.Message):
- console_message: "ConsoleMessage" = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PortCompletedRequest(betterproto.Message):
- port_id: "___core__.PortIdentity" = betterproto.message_field(1)
- input: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStateUpdatedRequest(betterproto.Message):
- state: "_worker__.WorkerState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class LinkWorkersRequest(betterproto.Message):
- link: "___core__.PhysicalLink" = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Ping(betterproto.Message):
- """Ping message"""
-
- i: int = betterproto.int32_field(1)
- end: int = betterproto.int32_field(2)
- to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class Pong(betterproto.Message):
- """Pong message"""
-
- i: int = betterproto.int32_field(1)
- end: int = betterproto.int32_field(2)
- to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class Pass(betterproto.Message):
- """Pass message"""
-
- value: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Nested(betterproto.Message):
- """Nested message"""
-
- k: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class MultiCall(betterproto.Message):
- """MultiCall message"""
-
- seq: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ErrorCommand(betterproto.Message):
- """ErrorCommand message"""
-
- pass
-
-
-@dataclass(eq=False, repr=False)
-class Collect(betterproto.Message):
- """Collect message"""
-
- workers: List["___core__.ActorVirtualIdentity"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class GenerateNumber(betterproto.Message):
- """GenerateNumber message"""
-
- pass
-
-
-@dataclass(eq=False, repr=False)
-class Chain(betterproto.Message):
- """Chain message"""
-
- nexts: List["___core__.ActorVirtualIdentity"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Recursion(betterproto.Message):
- """Recursion message"""
-
- i: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class AddInputChannelRequest(betterproto.Message):
- """Messages for the commands"""
-
- channel_id: "___core__.ChannelIdentity" = betterproto.message_field(1)
- port_id: "___core__.PortIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class AddPartitioningRequest(betterproto.Message):
- tag: "___core__.PhysicalLink" = betterproto.message_field(1)
- partitioning: "_sendsemantics__.Partitioning" =
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class AssignPortRequest(betterproto.Message):
- port_id: "___core__.PortIdentity" = betterproto.message_field(1)
- input: bool = betterproto.bool_field(2)
- schema: Dict[str, str] = betterproto.map_field(
- 3, betterproto.TYPE_STRING, betterproto.TYPE_STRING
- )
- storage_uris: List[str] = betterproto.string_field(4)
- partitionings: List["_sendsemantics__.Partitioning"] =
betterproto.message_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class FinalizeCheckpointRequest(betterproto.Message):
- checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
- betterproto.message_field(1)
- )
- write_to: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class InitializeExecutorRequest(betterproto.Message):
- total_worker_count: int = betterproto.int32_field(1)
- op_exec_init_info: "___core__.OpExecInitInfo" =
betterproto.message_field(2)
- is_source: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class UpdateExecutorRequest(betterproto.Message):
- target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1)
- new_exec_init_info: "___core__.OpExecInitInfo" =
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class PrepareCheckpointRequest(betterproto.Message):
- checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
- betterproto.message_field(1)
- )
- estimation_only: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class QueryStatisticsRequest(betterproto.Message):
- filter_by_workers: List["___core__.ActorVirtualIdentity"] = (
- betterproto.message_field(1)
- )
- update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class JumpToOperatorRegionRequest(betterproto.Message):
- target_operator_id: "___core__.OperatorIdentity" =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ControlReturn(betterproto.Message):
- """The generic return message"""
-
- retrieve_workflow_state_response: "RetrieveWorkflowStateResponse" = (
- betterproto.message_field(1, group="sealed_value")
- )
- """controller responses"""
-
- propagate_embedded_control_message_response: (
- "PropagateEmbeddedControlMessageResponse"
- ) = betterproto.message_field(2, group="sealed_value")
- take_global_checkpoint_response: "TakeGlobalCheckpointResponse" = (
- betterproto.message_field(3, group="sealed_value")
- )
- evaluate_python_expression_response: "EvaluatePythonExpressionResponse" = (
- betterproto.message_field(4, group="sealed_value")
- )
- start_workflow_response: "StartWorkflowResponse" =
betterproto.message_field(
- 5, group="sealed_value"
- )
- worker_state_response: "WorkerStateResponse" = betterproto.message_field(
- 50, group="sealed_value"
- )
- """worker responses"""
-
- worker_metrics_response: "WorkerMetricsResponse" =
betterproto.message_field(
- 51, group="sealed_value"
- )
- finalize_checkpoint_response: "FinalizeCheckpointResponse" = (
- betterproto.message_field(52, group="sealed_value")
- )
- control_error: "ControlError" = betterproto.message_field(101,
group="sealed_value")
- """common responses"""
-
- empty_return: "EmptyReturn" = betterproto.message_field(102,
group="sealed_value")
- string_response: "StringResponse" = betterproto.message_field(
- 103, group="sealed_value"
- )
- int_response: "IntResponse" = betterproto.message_field(104,
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class EmptyReturn(betterproto.Message):
- pass
-
-
-@dataclass(eq=False, repr=False)
-class ControlError(betterproto.Message):
- error_message: str = betterproto.string_field(1)
- error_details: str = betterproto.string_field(2)
- stack_trace: str = betterproto.string_field(3)
- language: "ErrorLanguage" = betterproto.enum_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class ReturnInvocation(betterproto.Message):
- command_id: int = betterproto.int64_field(1)
- return_value: "ControlReturn" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class StringResponse(betterproto.Message):
- value: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class IntResponse(betterproto.Message):
- value: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class RetrieveWorkflowStateResponse(betterproto.Message):
- state: Dict[str, str] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_STRING
- )
-
-
-@dataclass(eq=False, repr=False)
-class FinalizeCheckpointResponse(betterproto.Message):
- size: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PropagateEmbeddedControlMessageResponse(betterproto.Message):
- returns: Dict[str, "ControlReturn"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class TakeGlobalCheckpointResponse(betterproto.Message):
- total_size: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class TypedValue(betterproto.Message):
- expression: str = betterproto.string_field(1)
- value_ref: str = betterproto.string_field(2)
- value_str: str = betterproto.string_field(3)
- value_type: str = betterproto.string_field(4)
- expandable: bool = betterproto.bool_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValue(betterproto.Message):
- value: "TypedValue" = betterproto.message_field(1)
- attributes: List["TypedValue"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatePythonExpressionResponse(betterproto.Message):
- values: List["EvaluatedValue"] = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class StartWorkflowResponse(betterproto.Message):
- workflow_state: "WorkflowAggregatedState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStateResponse(betterproto.Message):
- state: "_worker__.WorkerState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerMetricsResponse(betterproto.Message):
- metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1)
-
-
-class RpcTesterStub(betterproto.ServiceStub):
- async def send_ping(
- self,
- ping: "Ping",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
- ping,
- IntResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_pong(
- self,
- pong: "Pong",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
- pong,
- IntResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_nested(
- self,
- nested: "Nested",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
- nested,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_pass(
- self,
- pass_: "Pass",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
- pass_,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_error_command(
- self,
- error_command: "ErrorCommand",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
- error_command,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_recursion(
- self,
- recursion: "Recursion",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
- recursion,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_collect(
- self,
- collect: "Collect",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
- collect,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_generate_number(
- self,
- generate_number: "GenerateNumber",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
- generate_number,
- IntResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_multi_call(
- self,
- multi_call: "MultiCall",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
- multi_call,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def send_chain(
- self,
- chain: "Chain",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
- chain,
- StringResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
-
-class WorkerServiceStub(betterproto.ServiceStub):
- async def add_input_channel(
- self,
- add_input_channel_request: "AddInputChannelRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
- add_input_channel_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def add_partitioning(
- self,
- add_partitioning_request: "AddPartitioningRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
- add_partitioning_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def assign_port(
- self,
- assign_port_request: "AssignPortRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
- assign_port_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def finalize_checkpoint(
- self,
- finalize_checkpoint_request: "FinalizeCheckpointRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "FinalizeCheckpointResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
- finalize_checkpoint_request,
- FinalizeCheckpointResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def flush_network_buffer(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def initialize_executor(
- self,
- initialize_executor_request: "InitializeExecutorRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
- initialize_executor_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def open_executor(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def pause_worker(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
- empty_request,
- WorkerStateResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def prepare_checkpoint(
- self,
- prepare_checkpoint_request: "PrepareCheckpointRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
- prepare_checkpoint_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def query_statistics(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "WorkerMetricsResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
- empty_request,
- WorkerMetricsResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def resume_worker(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
- empty_request,
- WorkerStateResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def retrieve_state(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def retry_current_tuple(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def start_worker(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
- empty_request,
- WorkerStateResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def end_worker(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def start_channel(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def end_channel(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def debug_command(
- self,
- debug_command_request: "DebugCommandRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
- debug_command_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def evaluate_python_expression(
- self,
- evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EvaluatedValue":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
- evaluate_python_expression_request,
- EvaluatedValue,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def no_operation(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def update_executor(
- self,
- update_executor_request: "UpdateExecutorRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
- update_executor_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
-
-class ControllerServiceStub(betterproto.ServiceStub):
- async def retrieve_workflow_state(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "RetrieveWorkflowStateResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
- empty_request,
- RetrieveWorkflowStateResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def propagate_embedded_control_message(
- self,
- propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "PropagateEmbeddedControlMessageResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
- propagate_embedded_control_message_request,
- PropagateEmbeddedControlMessageResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def take_global_checkpoint(
- self,
- take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "TakeGlobalCheckpointResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
- take_global_checkpoint_request,
- TakeGlobalCheckpointResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def debug_command(
- self,
- debug_command_request: "DebugCommandRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
- debug_command_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def evaluate_python_expression(
- self,
- evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EvaluatePythonExpressionResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
- evaluate_python_expression_request,
- EvaluatePythonExpressionResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def console_message_triggered(
- self,
- console_message_triggered_request: "ConsoleMessageTriggeredRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
- console_message_triggered_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def port_completed(
- self,
- port_completed_request: "PortCompletedRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
- port_completed_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def start_workflow(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "StartWorkflowResponse":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
- empty_request,
- StartWorkflowResponse,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def resume_workflow(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def pause_workflow(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def worker_state_updated(
- self,
- worker_state_updated_request: "WorkerStateUpdatedRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
- worker_state_updated_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def worker_execution_completed(
- self,
- empty_request: "EmptyRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
- empty_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def jump_to_operator_region(
- self,
- jump_to_operator_region_request: "JumpToOperatorRegionRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion",
- jump_to_operator_region_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def link_workers(
- self,
- link_workers_request: "LinkWorkersRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
- link_workers_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def controller_initiate_query_statistics(
- self,
- query_statistics_request: "QueryStatisticsRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
- query_statistics_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def retry_workflow(
- self,
- retry_workflow_request: "RetryWorkflowRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
- retry_workflow_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
- async def reconfigure_workflow(
- self,
- workflow_reconfigure_request: "WorkflowReconfigureRequest",
- *,
- timeout: Optional[float] = None,
- deadline: Optional["Deadline"] = None,
- metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
- return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
- workflow_reconfigure_request,
- EmptyReturn,
- timeout=timeout,
- deadline=deadline,
- metadata=metadata,
- )
-
-
-class RpcTesterBase(ServiceBase):
-
- async def send_ping(self, ping: "Ping") -> "IntResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_pong(self, pong: "Pong") -> "IntResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_nested(self, nested: "Nested") -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_pass(self, pass_: "Pass") -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_error_command(
- self, error_command: "ErrorCommand"
- ) -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_collect(self, collect: "Collect") -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_generate_number(
- self, generate_number: "GenerateNumber"
- ) -> "IntResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_multi_call(self, multi_call: "MultiCall") ->
"StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def send_chain(self, chain: "Chain") -> "StringResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def __rpc_send_ping(
- self, stream: "grpclib.server.Stream[Ping, IntResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_ping(request)
- await stream.send_message(response)
-
- async def __rpc_send_pong(
- self, stream: "grpclib.server.Stream[Pong, IntResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_pong(request)
- await stream.send_message(response)
-
- async def __rpc_send_nested(
- self, stream: "grpclib.server.Stream[Nested, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_nested(request)
- await stream.send_message(response)
-
- async def __rpc_send_pass(
- self, stream: "grpclib.server.Stream[Pass, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_pass(request)
- await stream.send_message(response)
-
- async def __rpc_send_error_command(
- self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_error_command(request)
- await stream.send_message(response)
-
- async def __rpc_send_recursion(
- self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_recursion(request)
- await stream.send_message(response)
-
- async def __rpc_send_collect(
- self, stream: "grpclib.server.Stream[Collect, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_collect(request)
- await stream.send_message(response)
-
- async def __rpc_send_generate_number(
- self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_generate_number(request)
- await stream.send_message(response)
-
- async def __rpc_send_multi_call(
- self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_multi_call(request)
- await stream.send_message(response)
-
- async def __rpc_send_chain(
- self, stream: "grpclib.server.Stream[Chain, StringResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.send_chain(request)
- await stream.send_message(response)
-
- def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
- return {
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing":
grpclib.const.Handler(
- self.__rpc_send_ping,
- grpclib.const.Cardinality.UNARY_UNARY,
- Ping,
- IntResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong":
grpclib.const.Handler(
- self.__rpc_send_pong,
- grpclib.const.Cardinality.UNARY_UNARY,
- Pong,
- IntResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested":
grpclib.const.Handler(
- self.__rpc_send_nested,
- grpclib.const.Cardinality.UNARY_UNARY,
- Nested,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass":
grpclib.const.Handler(
- self.__rpc_send_pass,
- grpclib.const.Cardinality.UNARY_UNARY,
- Pass,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand":
grpclib.const.Handler(
- self.__rpc_send_error_command,
- grpclib.const.Cardinality.UNARY_UNARY,
- ErrorCommand,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion":
grpclib.const.Handler(
- self.__rpc_send_recursion,
- grpclib.const.Cardinality.UNARY_UNARY,
- Recursion,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect":
grpclib.const.Handler(
- self.__rpc_send_collect,
- grpclib.const.Cardinality.UNARY_UNARY,
- Collect,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
grpclib.const.Handler(
- self.__rpc_send_generate_number,
- grpclib.const.Cardinality.UNARY_UNARY,
- GenerateNumber,
- IntResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall":
grpclib.const.Handler(
- self.__rpc_send_multi_call,
- grpclib.const.Cardinality.UNARY_UNARY,
- MultiCall,
- StringResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain":
grpclib.const.Handler(
- self.__rpc_send_chain,
- grpclib.const.Cardinality.UNARY_UNARY,
- Chain,
- StringResponse,
- ),
- }
-
-
-class WorkerServiceBase(ServiceBase):
-
- async def add_input_channel(
- self, add_input_channel_request: "AddInputChannelRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def add_partitioning(
- self, add_partitioning_request: "AddPartitioningRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def assign_port(
- self, assign_port_request: "AssignPortRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def finalize_checkpoint(
- self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
- ) -> "FinalizeCheckpointResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def flush_network_buffer(
- self, empty_request: "EmptyRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def initialize_executor(
- self, initialize_executor_request: "InitializeExecutorRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def open_executor(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def pause_worker(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def prepare_checkpoint(
- self, prepare_checkpoint_request: "PrepareCheckpointRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def query_statistics(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerMetricsResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def resume_worker(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def retrieve_state(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def retry_current_tuple(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def start_worker(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def start_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def end_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def debug_command(
- self, debug_command_request: "DebugCommandRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def evaluate_python_expression(
- self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
- ) -> "EvaluatedValue":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def no_operation(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def update_executor(
- self, update_executor_request: "UpdateExecutorRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def __rpc_add_input_channel(
- self, stream: "grpclib.server.Stream[AddInputChannelRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.add_input_channel(request)
- await stream.send_message(response)
-
- async def __rpc_add_partitioning(
- self, stream: "grpclib.server.Stream[AddPartitioningRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.add_partitioning(request)
- await stream.send_message(response)
-
- async def __rpc_assign_port(
- self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.assign_port(request)
- await stream.send_message(response)
-
- async def __rpc_finalize_checkpoint(
- self,
- stream: "grpclib.server.Stream[FinalizeCheckpointRequest,
FinalizeCheckpointResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.finalize_checkpoint(request)
- await stream.send_message(response)
-
- async def __rpc_flush_network_buffer(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.flush_network_buffer(request)
- await stream.send_message(response)
-
- async def __rpc_initialize_executor(
- self, stream: "grpclib.server.Stream[InitializeExecutorRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.initialize_executor(request)
- await stream.send_message(response)
-
- async def __rpc_open_executor(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.open_executor(request)
- await stream.send_message(response)
-
- async def __rpc_pause_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.pause_worker(request)
- await stream.send_message(response)
-
- async def __rpc_prepare_checkpoint(
- self, stream: "grpclib.server.Stream[PrepareCheckpointRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.prepare_checkpoint(request)
- await stream.send_message(response)
-
- async def __rpc_query_statistics(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerMetricsResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.query_statistics(request)
- await stream.send_message(response)
-
- async def __rpc_resume_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.resume_worker(request)
- await stream.send_message(response)
-
- async def __rpc_retrieve_state(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.retrieve_state(request)
- await stream.send_message(response)
-
- async def __rpc_retry_current_tuple(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.retry_current_tuple(request)
- await stream.send_message(response)
-
- async def __rpc_start_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.start_worker(request)
- await stream.send_message(response)
-
- async def __rpc_end_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.end_worker(request)
- await stream.send_message(response)
-
- async def __rpc_start_channel(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.start_channel(request)
- await stream.send_message(response)
-
- async def __rpc_end_channel(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.end_channel(request)
- await stream.send_message(response)
-
- async def __rpc_debug_command(
- self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.debug_command(request)
- await stream.send_message(response)
-
- async def __rpc_evaluate_python_expression(
- self,
- stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatedValue]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.evaluate_python_expression(request)
- await stream.send_message(response)
-
- async def __rpc_no_operation(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.no_operation(request)
- await stream.send_message(response)
-
- async def __rpc_update_executor(
- self, stream: "grpclib.server.Stream[UpdateExecutorRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.update_executor(request)
- await stream.send_message(response)
-
- def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
- return {
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
grpclib.const.Handler(
- self.__rpc_add_input_channel,
- grpclib.const.Cardinality.UNARY_UNARY,
- AddInputChannelRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
grpclib.const.Handler(
- self.__rpc_add_partitioning,
- grpclib.const.Cardinality.UNARY_UNARY,
- AddPartitioningRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort":
grpclib.const.Handler(
- self.__rpc_assign_port,
- grpclib.const.Cardinality.UNARY_UNARY,
- AssignPortRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
grpclib.const.Handler(
- self.__rpc_finalize_checkpoint,
- grpclib.const.Cardinality.UNARY_UNARY,
- FinalizeCheckpointRequest,
- FinalizeCheckpointResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
grpclib.const.Handler(
- self.__rpc_flush_network_buffer,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
grpclib.const.Handler(
- self.__rpc_initialize_executor,
- grpclib.const.Cardinality.UNARY_UNARY,
- InitializeExecutorRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor":
grpclib.const.Handler(
- self.__rpc_open_executor,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker":
grpclib.const.Handler(
- self.__rpc_pause_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
grpclib.const.Handler(
- self.__rpc_prepare_checkpoint,
- grpclib.const.Cardinality.UNARY_UNARY,
- PrepareCheckpointRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
grpclib.const.Handler(
- self.__rpc_query_statistics,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerMetricsResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker":
grpclib.const.Handler(
- self.__rpc_resume_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState":
grpclib.const.Handler(
- self.__rpc_retrieve_state,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
grpclib.const.Handler(
- self.__rpc_retry_current_tuple,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker":
grpclib.const.Handler(
- self.__rpc_start_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker":
grpclib.const.Handler(
- self.__rpc_end_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel":
grpclib.const.Handler(
- self.__rpc_start_channel,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel":
grpclib.const.Handler(
- self.__rpc_end_channel,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand":
grpclib.const.Handler(
- self.__rpc_debug_command,
- grpclib.const.Cardinality.UNARY_UNARY,
- DebugCommandRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
grpclib.const.Handler(
- self.__rpc_evaluate_python_expression,
- grpclib.const.Cardinality.UNARY_UNARY,
- EvaluatePythonExpressionRequest,
- EvaluatedValue,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation":
grpclib.const.Handler(
- self.__rpc_no_operation,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
grpclib.const.Handler(
- self.__rpc_update_executor,
- grpclib.const.Cardinality.UNARY_UNARY,
- UpdateExecutorRequest,
- EmptyReturn,
- ),
- }
-
-
-class ControllerServiceBase(ServiceBase):
-
- async def retrieve_workflow_state(
- self, empty_request: "EmptyRequest"
- ) -> "RetrieveWorkflowStateResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def propagate_embedded_control_message(
- self,
- propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
- ) -> "PropagateEmbeddedControlMessageResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def take_global_checkpoint(
- self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
- ) -> "TakeGlobalCheckpointResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def debug_command(
- self, debug_command_request: "DebugCommandRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def evaluate_python_expression(
- self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
- ) -> "EvaluatePythonExpressionResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def console_message_triggered(
- self, console_message_triggered_request:
"ConsoleMessageTriggeredRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def port_completed(
- self, port_completed_request: "PortCompletedRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def start_workflow(
- self, empty_request: "EmptyRequest"
- ) -> "StartWorkflowResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def resume_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def pause_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def worker_state_updated(
- self, worker_state_updated_request: "WorkerStateUpdatedRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def worker_execution_completed(
- self, empty_request: "EmptyRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def jump_to_operator_region(
- self, jump_to_operator_region_request: "JumpToOperatorRegionRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def link_workers(
- self, link_workers_request: "LinkWorkersRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def controller_initiate_query_statistics(
- self, query_statistics_request: "QueryStatisticsRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def retry_workflow(
- self, retry_workflow_request: "RetryWorkflowRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def reconfigure_workflow(
- self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def __rpc_retrieve_workflow_state(
- self,
- stream: "grpclib.server.Stream[EmptyRequest,
RetrieveWorkflowStateResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.retrieve_workflow_state(request)
- await stream.send_message(response)
-
- async def __rpc_propagate_embedded_control_message(
- self,
- stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest,
PropagateEmbeddedControlMessageResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.propagate_embedded_control_message(request)
- await stream.send_message(response)
-
- async def __rpc_take_global_checkpoint(
- self,
- stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest,
TakeGlobalCheckpointResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.take_global_checkpoint(request)
- await stream.send_message(response)
-
- async def __rpc_debug_command(
- self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.debug_command(request)
- await stream.send_message(response)
-
- async def __rpc_evaluate_python_expression(
- self,
- stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatePythonExpressionResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.evaluate_python_expression(request)
- await stream.send_message(response)
-
- async def __rpc_console_message_triggered(
- self,
- stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest,
EmptyReturn]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.console_message_triggered(request)
- await stream.send_message(response)
-
- async def __rpc_port_completed(
- self, stream: "grpclib.server.Stream[PortCompletedRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.port_completed(request)
- await stream.send_message(response)
-
- async def __rpc_start_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest,
StartWorkflowResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.start_workflow(request)
- await stream.send_message(response)
-
- async def __rpc_resume_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.resume_workflow(request)
- await stream.send_message(response)
-
- async def __rpc_pause_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.pause_workflow(request)
- await stream.send_message(response)
-
- async def __rpc_worker_state_updated(
- self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.worker_state_updated(request)
- await stream.send_message(response)
-
- async def __rpc_worker_execution_completed(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.worker_execution_completed(request)
- await stream.send_message(response)
-
- async def __rpc_jump_to_operator_region(
- self, stream: "grpclib.server.Stream[JumpToOperatorRegionRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.jump_to_operator_region(request)
- await stream.send_message(response)
-
- async def __rpc_link_workers(
- self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.link_workers(request)
- await stream.send_message(response)
-
- async def __rpc_controller_initiate_query_statistics(
- self, stream: "grpclib.server.Stream[QueryStatisticsRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.controller_initiate_query_statistics(request)
- await stream.send_message(response)
-
- async def __rpc_retry_workflow(
- self, stream: "grpclib.server.Stream[RetryWorkflowRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.retry_workflow(request)
- await stream.send_message(response)
-
- async def __rpc_reconfigure_workflow(
- self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.reconfigure_workflow(request)
- await stream.send_message(response)
-
- def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
- return {
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
grpclib.const.Handler(
- self.__rpc_retrieve_workflow_state,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- RetrieveWorkflowStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
grpclib.const.Handler(
- self.__rpc_propagate_embedded_control_message,
- grpclib.const.Cardinality.UNARY_UNARY,
- PropagateEmbeddedControlMessageRequest,
- PropagateEmbeddedControlMessageResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
grpclib.const.Handler(
- self.__rpc_take_global_checkpoint,
- grpclib.const.Cardinality.UNARY_UNARY,
- TakeGlobalCheckpointRequest,
- TakeGlobalCheckpointResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
grpclib.const.Handler(
- self.__rpc_debug_command,
- grpclib.const.Cardinality.UNARY_UNARY,
- DebugCommandRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
grpclib.const.Handler(
- self.__rpc_evaluate_python_expression,
- grpclib.const.Cardinality.UNARY_UNARY,
- EvaluatePythonExpressionRequest,
- EvaluatePythonExpressionResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
grpclib.const.Handler(
- self.__rpc_console_message_triggered,
- grpclib.const.Cardinality.UNARY_UNARY,
- ConsoleMessageTriggeredRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
grpclib.const.Handler(
- self.__rpc_port_completed,
- grpclib.const.Cardinality.UNARY_UNARY,
- PortCompletedRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
grpclib.const.Handler(
- self.__rpc_start_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- StartWorkflowResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
grpclib.const.Handler(
- self.__rpc_resume_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
grpclib.const.Handler(
- self.__rpc_pause_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
grpclib.const.Handler(
- self.__rpc_worker_state_updated,
- grpclib.const.Cardinality.UNARY_UNARY,
- WorkerStateUpdatedRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
grpclib.const.Handler(
- self.__rpc_worker_execution_completed,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion":
grpclib.const.Handler(
- self.__rpc_jump_to_operator_region,
- grpclib.const.Cardinality.UNARY_UNARY,
- JumpToOperatorRegionRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
grpclib.const.Handler(
- self.__rpc_link_workers,
- grpclib.const.Cardinality.UNARY_UNARY,
- LinkWorkersRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
grpclib.const.Handler(
- self.__rpc_controller_initiate_query_statistics,
- grpclib.const.Cardinality.UNARY_UNARY,
- QueryStatisticsRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
grpclib.const.Handler(
- self.__rpc_retry_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- RetryWorkflowRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
grpclib.const.Handler(
- self.__rpc_reconfigure_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- WorkflowReconfigureRequest,
- EmptyReturn,
- ),
- }
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
deleted file mode 100644
index bc241806b5..0000000000
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources:
org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
- List,
-)
-
-import betterproto
-
-from .... import core as ___core__
-
-
-@dataclass(eq=False, repr=False)
-class Partitioning(betterproto.Message):
- one_to_one_partitioning: "OneToOnePartitioning" =
betterproto.message_field(
- 1, group="sealed_value"
- )
- round_robin_partitioning: "RoundRobinPartitioning" =
betterproto.message_field(
- 2, group="sealed_value"
- )
- hash_based_shuffle_partitioning: "HashBasedShufflePartitioning" = (
- betterproto.message_field(3, group="sealed_value")
- )
- range_based_shuffle_partitioning: "RangeBasedShufflePartitioning" = (
- betterproto.message_field(4, group="sealed_value")
- )
- broadcast_partitioning: "BroadcastPartitioning" =
betterproto.message_field(
- 5, group="sealed_value"
- )
-
-
-@dataclass(eq=False, repr=False)
-class OneToOnePartitioning(betterproto.Message):
- batch_size: int = betterproto.int32_field(1)
- channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class RoundRobinPartitioning(betterproto.Message):
- batch_size: int = betterproto.int32_field(1)
- channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class HashBasedShufflePartitioning(betterproto.Message):
- batch_size: int = betterproto.int32_field(1)
- channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
- hash_attribute_names: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class RangeBasedShufflePartitioning(betterproto.Message):
- batch_size: int = betterproto.int32_field(1)
- channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
- range_attribute_names: List[str] = betterproto.string_field(3)
- range_min: int = betterproto.int64_field(4)
- range_max: int = betterproto.int64_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class BroadcastPartitioning(betterproto.Message):
- batch_size: int = betterproto.int32_field(1)
- channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
deleted file mode 100644
index 6a7b210e18..0000000000
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources: org/apache/texera/amber/engine/architecture/worker/statistics.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
- List,
-)
-
-import betterproto
-
-from .... import core as ___core__
-
-
-class WorkerState(betterproto.Enum):
- UNINITIALIZED = 0
- READY = 1
- RUNNING = 2
- PAUSED = 3
- COMPLETED = 4
- TERMINATED = 5
-
-
-@dataclass(eq=False, repr=False)
-class PortTupleMetricsMapping(betterproto.Message):
- port_id: "___core__.PortIdentity" = betterproto.message_field(1)
- tuple_metrics: "TupleMetrics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class TupleMetrics(betterproto.Message):
- count: int = betterproto.int64_field(1)
- size: int = betterproto.int64_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStatistics(betterproto.Message):
- input_tuple_metrics: List["PortTupleMetricsMapping"] =
betterproto.message_field(1)
- output_tuple_metrics: List["PortTupleMetricsMapping"] =
betterproto.message_field(2)
- data_processing_time: int = betterproto.int64_field(3)
- control_processing_time: int = betterproto.int64_field(4)
- idle_time: int = betterproto.int64_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerMetrics(betterproto.Message):
- worker_state: "WorkerState" = betterproto.enum_field(1)
- worker_statistics: "WorkerStatistics" = betterproto.message_field(2)
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
deleted file mode 100644
index 55c789aa39..0000000000
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
+++ /dev/null
@@ -1,156 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources: org/apache/texera/amber/engine/common/actormessage.proto,
org/apache/texera/amber/engine/common/ambermessage.proto,
org/apache/texera/amber/engine/common/executionruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
- Dict,
- List,
-)
-
-import betterproto
-
-from ... import core as __core__
-from ..architecture import (
- rpc as _architecture_rpc__,
- worker as _architecture_worker__,
-)
-
-
-@dataclass(eq=False, repr=False)
-class DirectControlMessagePayloadV2(betterproto.Message):
- control_invocation: "_architecture_rpc__.ControlInvocation" = (
- betterproto.message_field(1, group="value")
- )
- return_invocation: "_architecture_rpc__.ReturnInvocation" = (
- betterproto.message_field(2, group="value")
- )
-
-
-@dataclass(eq=False, repr=False)
-class PythonDataHeader(betterproto.Message):
- tag: "__core__.ChannelIdentity" = betterproto.message_field(1)
- payload_type: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class PythonControlMessage(betterproto.Message):
- tag: "__core__.ChannelIdentity" = betterproto.message_field(1)
- payload: "DirectControlMessagePayloadV2" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFault(betterproto.Message):
- worker_name: str = betterproto.string_field(1)
- faulted_tuple: "BreakpointFaultBreakpointTuple" =
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFaultBreakpointTuple(betterproto.Message):
- id: int = betterproto.int64_field(1)
- is_input: bool = betterproto.bool_field(2)
- tuple: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorBreakpoints(betterproto.Message):
- unresolved_breakpoints: List["BreakpointFault"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionBreakpointStore(betterproto.Message):
- operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValueList(betterproto.Message):
- values: List["_architecture_rpc__.EvaluatedValue"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorConsole(betterproto.Message):
- console_messages: List["_architecture_rpc__.ConsoleMessage"] = (
- betterproto.message_field(1)
- )
- evaluate_expr_results: Dict[str, "EvaluatedValueList"] =
betterproto.map_field(
- 2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionConsoleStore(betterproto.Message):
- operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorWorkerMapping(betterproto.Message):
- operator_id: str = betterproto.string_field(1)
- worker_ids: List[str] = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorStatistics(betterproto.Message):
- input_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = (
- betterproto.message_field(1)
- )
- output_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = (
- betterproto.message_field(2)
- )
- num_workers: int = betterproto.int32_field(3)
- data_processing_time: int = betterproto.int64_field(4)
- control_processing_time: int = betterproto.int64_field(5)
- idle_time: int = betterproto.int64_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorMetrics(betterproto.Message):
- operator_state: "_architecture_rpc__.WorkflowAggregatedState" = (
- betterproto.enum_field(1)
- )
- operator_statistics: "OperatorStatistics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionStatsStore(betterproto.Message):
- start_time_stamp: int = betterproto.int64_field(1)
- end_time_stamp: int = betterproto.int64_field(2)
- operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field(
- 3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
- operator_worker_mapping: List["OperatorWorkerMapping"] =
betterproto.message_field(
- 4
- )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionMetadataStore(betterproto.Message):
- state: "_architecture_rpc__.WorkflowAggregatedState" =
betterproto.enum_field(1)
- fatal_errors: List["__core__.WorkflowFatalError"] =
betterproto.message_field(2)
- execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3)
- is_recovering: bool = betterproto.bool_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class Backpressure(betterproto.Message):
- enable_backpressure: bool = betterproto.bool_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class CreditUpdate(betterproto.Message):
- pass
-
-
-@dataclass(eq=False, repr=False)
-class ActorCommand(betterproto.Message):
- backpressure: "Backpressure" = betterproto.message_field(1,
group="sealed_value")
- credit_update: "CreditUpdate" = betterproto.message_field(2,
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class PythonActorMessage(betterproto.Message):
- payload: "ActorCommand" = betterproto.message_field(1)
diff --git a/amber/src/main/python/proto/org/apache/texera/web/__init__.py
b/amber/src/main/python/proto/org/apache/texera/web/__init__.py
deleted file mode 100644
index adb5848bb0..0000000000
--- a/amber/src/main/python/proto/org/apache/texera/web/__init__.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources: org/apache/texera/workflowruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
- Dict,
- List,
-)
-
-import betterproto
-
-from ...amber.engine import common as __amber_engine_common__
-from ...amber.engine.architecture import worker as
__amber_engine_architecture_worker__
-
-
-class FatalErrorType(betterproto.Enum):
- COMPILATION_ERROR = 0
- EXECUTION_FAILURE = 1
-
-
-class WorkflowAggregatedState(betterproto.Enum):
- UNINITIALIZED = 0
- READY = 1
- RUNNING = 2
- PAUSING = 3
- PAUSED = 4
- RESUMING = 5
- COMPLETED = 6
- FAILED = 7
- UNKNOWN = 8
- KILLED = 9
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFault(betterproto.Message):
- worker_name: str = betterproto.string_field(1)
- faulted_tuple: "BreakpointFaultBreakpointTuple" =
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFaultBreakpointTuple(betterproto.Message):
- id: int = betterproto.int64_field(1)
- is_input: bool = betterproto.bool_field(2)
- tuple: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorBreakpoints(betterproto.Message):
- unresolved_breakpoints: List["BreakpointFault"] =
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionBreakpointStore(betterproto.Message):
- operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValueList(betterproto.Message):
- values: List["__amber_engine_architecture_worker__.EvaluatedValue"] = (
- betterproto.message_field(1)
- )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorConsole(betterproto.Message):
- console_messages:
List["__amber_engine_architecture_worker__.ConsoleMessage"] = (
- betterproto.message_field(1)
- )
- evaluate_expr_results: Dict[str, "EvaluatedValueList"] =
betterproto.map_field(
- 2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionConsoleStore(betterproto.Message):
- operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorWorkerMapping(betterproto.Message):
- operator_id: str = betterproto.string_field(1)
- worker_ids: List[str] = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorStatistics(betterproto.Message):
- input_count:
List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = (
- betterproto.message_field(1)
- )
- output_count:
List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = (
- betterproto.message_field(2)
- )
- num_workers: int = betterproto.int32_field(3)
- data_processing_time: int = betterproto.int64_field(4)
- control_processing_time: int = betterproto.int64_field(5)
- idle_time: int = betterproto.int64_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorMetrics(betterproto.Message):
- operator_state: "WorkflowAggregatedState" = betterproto.enum_field(1)
- operator_statistics: "OperatorStatistics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionStatsStore(betterproto.Message):
- start_time_stamp: int = betterproto.int64_field(1)
- end_time_stamp: int = betterproto.int64_field(2)
- operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field(
- 3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
- operator_worker_mapping: List["OperatorWorkerMapping"] =
betterproto.message_field(
- 4
- )
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowFatalError(betterproto.Message):
- type: "FatalErrorType" = betterproto.enum_field(1)
- timestamp: datetime = betterproto.message_field(2)
- message: str = betterproto.string_field(3)
- details: str = betterproto.string_field(4)
- operator_id: str = betterproto.string_field(5)
- worker_id: str = betterproto.string_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionMetadataStore(betterproto.Message):
- state: "WorkflowAggregatedState" = betterproto.enum_field(1)
- fatal_errors: List["WorkflowFatalError"] = betterproto.message_field(2)
- execution_id: "__amber_engine_common__.ExecutionIdentity" = (
- betterproto.message_field(3)
- )
- is_recovering: bool = betterproto.bool_field(4)
diff --git a/amber/src/main/python/proto/scalapb/__init__.py
b/amber/src/main/python/proto/scalapb/__init__.py
deleted file mode 100644
index 49c713815a..0000000000
--- a/amber/src/main/python/proto/scalapb/__init__.py
+++ /dev/null
@@ -1,421 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# sources: scalapb/scalapb.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
- Dict,
- List,
-)
-
-import betterproto
-import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf
-
-
-class MatchType(betterproto.Enum):
- CONTAINS = 0
- EXACT = 1
- PRESENCE = 2
-
-
-class ScalaPbOptionsOptionsScope(betterproto.Enum):
- """
- Whether to apply the options only to this file, or for the entire package
(and its subpackages)
- """
-
- FILE = 0
- """Apply the options for this file only (default)"""
-
- PACKAGE = 1
- """Apply the options for the entire package and its subpackages."""
-
-
-class ScalaPbOptionsEnumValueNaming(betterproto.Enum):
- """Naming convention for generated enum values"""
-
- AS_IN_PROTO = 0
- CAMEL_CASE = 1
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptions(betterproto.Message):
- package_name: str = betterproto.string_field(1)
- """If set then it overrides the java_package and package."""
-
- flat_package: bool = betterproto.bool_field(2)
- """
- If true, the compiler does not append the proto base file name
- into the generated package name. If false (the default), the
- generated scala package name is the package_name.basename where
- basename is the proto file name without the .proto extension.
- """
-
- import_: List[str] = betterproto.string_field(3)
- """
- Adds the following imports at the top of the file (this is meant
- to provide implicit TypeMappers)
- """
-
- preamble: List[str] = betterproto.string_field(4)
- """
- Text to add to the generated scala file. This can be used only
- when single_file is true.
- """
-
- single_file: bool = betterproto.bool_field(5)
- """
- If true, all messages and enums (but not services) will be written
- to a single Scala file.
- """
-
- no_primitive_wrappers: bool = betterproto.bool_field(7)
- """
- By default, wrappers defined at
-
https://github.com/google/protobuf/blob/master/src/google/protobuf/wrappers.proto,
- are mapped to an Option[T] where T is a primitive type. When this field
- is set to true, we do not perform this transformation.
- """
-
- primitive_wrappers: bool = betterproto.bool_field(6)
- """
- DEPRECATED. In ScalaPB <= 0.5.47, it was necessary to explicitly enable
- primitive_wrappers. This field remains here for backwards compatibility,
- but it has no effect on generated code. It is an error to set both
- `primitive_wrappers` and `no_primitive_wrappers`.
- """
-
- collection_type: str = betterproto.string_field(8)
- """
- Scala type to be used for repeated fields. If unspecified,
- `scala.collection.Seq` will be used.
- """
-
- preserve_unknown_fields: bool = betterproto.bool_field(9)
- """
- If set to true, all generated messages in this file will preserve unknown
- fields.
- """
-
- object_name: str = betterproto.string_field(10)
- """
- If defined, sets the name of the file-level object that would be
generated. This
- object extends `GeneratedFileObject` and contains descriptors, and list
of message
- and enum companions.
- """
-
- scope: "ScalaPbOptionsOptionsScope" = betterproto.enum_field(11)
- """Experimental: scope to apply the given options."""
-
- lenses: bool = betterproto.bool_field(12)
- """If true, lenses will be generated."""
-
- retain_source_code_info: bool = betterproto.bool_field(13)
- """
- If true, then source-code info information will be included in the
- generated code - normally the source code info is cleared out to reduce
- code size. The source code info is useful for extracting source code
- location from the descriptors as well as comments.
- """
-
- map_type: str = betterproto.string_field(14)
- """
- Scala type to be used for maps. If unspecified,
- `scala.collection.immutable.Map` will be used.
- """
-
- no_default_values_in_constructor: bool = betterproto.bool_field(15)
- """
- If true, no default values will be generated in message constructors.
- """
-
- enum_value_naming: "ScalaPbOptionsEnumValueNaming" =
betterproto.enum_field(16)
- enum_strip_prefix: bool = betterproto.bool_field(17)
- """
- Indicate if prefix (enum name + optional underscore) should be removed in
scala code
- Strip is applied before enum value naming changes.
- """
-
- bytes_type: str = betterproto.string_field(21)
- """Scala type to use for bytes fields."""
-
- java_conversions: bool = betterproto.bool_field(23)
- """Enable java conversions for this file."""
-
- aux_message_options: List["ScalaPbOptionsAuxMessageOptions"] = (
- betterproto.message_field(18)
- )
- """List of message options to apply to some messages."""
-
- aux_field_options: List["ScalaPbOptionsAuxFieldOptions"] = (
- betterproto.message_field(19)
- )
- """List of message options to apply to some fields."""
-
- aux_enum_options: List["ScalaPbOptionsAuxEnumOptions"] =
betterproto.message_field(
- 20
- )
- """List of message options to apply to some enums."""
-
- aux_enum_value_options: List["ScalaPbOptionsAuxEnumValueOptions"] = (
- betterproto.message_field(22)
- )
- """List of enum value options to apply to some enum values."""
-
- preprocessors: List[str] = betterproto.string_field(24)
- """List of preprocessors to apply."""
-
- field_transformations: List["FieldTransformation"] =
betterproto.message_field(25)
- ignore_all_transformations: bool = betterproto.bool_field(26)
- """
- Ignores all transformations for this file. This is meant to allow specific
files to
- opt out from transformations inherited through package-scoped options.
- """
-
- getters: bool = betterproto.bool_field(27)
- """If true, getters will be generated."""
-
- test_only_no_java_conversions: bool = betterproto.bool_field(999)
- """
- For use in tests only. Inhibit Java conversions even when when generator
parameters
- request for it.
- """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxMessageOptions(betterproto.Message):
- """
- AuxMessageOptions enables you to set message-level options through
package-scoped options.
- This is useful when you can't add a dependency on scalapb.proto from the
proto file that
- defines the message.
- """
-
- target: str = betterproto.string_field(1)
- """The fully-qualified name of the message in the proto name space."""
-
- options: "MessageOptions" = betterproto.message_field(2)
- """
- Options to apply to the message. If there are any options defined on the
target message
- they take precedence over the options.
- """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxFieldOptions(betterproto.Message):
- """
- AuxFieldOptions enables you to set field-level options through
package-scoped options.
- This is useful when you can't add a dependency on scalapb.proto from the
proto file that
- defines the field.
- """
-
- target: str = betterproto.string_field(1)
- """The fully-qualified name of the field in the proto name space."""
-
- options: "FieldOptions" = betterproto.message_field(2)
- """
- Options to apply to the field. If there are any options defined on the
target message
- they take precedence over the options.
- """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxEnumOptions(betterproto.Message):
- """
- AuxEnumOptions enables you to set enum-level options through
package-scoped options.
- This is useful when you can't add a dependency on scalapb.proto from the
proto file that
- defines the enum.
- """
-
- target: str = betterproto.string_field(1)
- """The fully-qualified name of the enum in the proto name space."""
-
- options: "EnumOptions" = betterproto.message_field(2)
- """
- Options to apply to the enum. If there are any options defined on the
target enum
- they take precedence over the options.
- """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxEnumValueOptions(betterproto.Message):
- """
- AuxEnumValueOptions enables you to set enum value level options through
package-scoped
- options. This is useful when you can't add a dependency on scalapb.proto
from the proto
- file that defines the enum.
- """
-
- target: str = betterproto.string_field(1)
- """The fully-qualified name of the enum value in the proto name space."""
-
- options: "EnumValueOptions" = betterproto.message_field(2)
- """
- Options to apply to the enum value. If there are any options defined on
- the target enum value they take precedence over the options.
- """
-
-
-@dataclass(eq=False, repr=False)
-class MessageOptions(betterproto.Message):
- extends: List[str] = betterproto.string_field(1)
- """Additional classes and traits to mix in to the case class."""
-
- companion_extends: List[str] = betterproto.string_field(2)
- """Additional classes and traits to mix in to the companion object."""
-
- annotations: List[str] = betterproto.string_field(3)
- """Custom annotations to add to the generated case class."""
-
- type: str = betterproto.string_field(4)
- """
- All instances of this message will be converted to this type. An implicit
TypeMapper
- must be present.
- """
-
- companion_annotations: List[str] = betterproto.string_field(5)
- """
- Custom annotations to add to the companion object of the generated class.
- """
-
- sealed_oneof_extends: List[str] = betterproto.string_field(6)
- """
- Additional classes and traits to mix in to generated sealed_oneof base
trait.
- """
-
- no_box: bool = betterproto.bool_field(7)
- """
- If true, when this message is used as an optional field, do not wrap it in
an `Option`.
- This is equivalent of setting `(field).no_box` to true on each field with
the message type.
- """
-
- unknown_fields_annotations: List[str] = betterproto.string_field(8)
- """
- Custom annotations to add to the generated `unknownFields` case class
field.
- """
-
-
-@dataclass(eq=False, repr=False)
-class Collection(betterproto.Message):
- """
- Represents a custom Collection type in Scala. This allows ScalaPB to
integrate with
- collection types that are different enough from the ones in the standard
library.
- """
-
- type: str = betterproto.string_field(1)
- """Type of the collection"""
-
- non_empty: bool = betterproto.bool_field(2)
- """
- Set to true if this collection type is not allowed to be empty, for example
- cats.data.NonEmptyList. When true, ScalaPB will not generate `clearX`
for the repeated
- field and not provide a default argument in the constructor.
- """
-
- adapter: str = betterproto.string_field(3)
- """
- An Adapter is a Scala object available at runtime that provides certain
static methods
- that can operate on this collection type.
- """
-
-
-@dataclass(eq=False, repr=False)
-class FieldOptions(betterproto.Message):
- type: str = betterproto.string_field(1)
- scala_name: str = betterproto.string_field(2)
- collection_type: str = betterproto.string_field(3)
- """
- Can be specified only if this field is repeated. If unspecified,
- it falls back to the file option named `collection_type`, which defaults
- to `scala.collection.Seq`.
- """
-
- collection: "Collection" = betterproto.message_field(8)
- key_type: str = betterproto.string_field(4)
- """
- If the field is a map, you can specify custom Scala types for the key
- or value.
- """
-
- value_type: str = betterproto.string_field(5)
- annotations: List[str] = betterproto.string_field(6)
- """Custom annotations to add to the field."""
-
- map_type: str = betterproto.string_field(7)
- """
- Can be specified only if this field is a map. If unspecified,
- it falls back to the file option named `map_type` which defaults to
- `scala.collection.immutable.Map`
- """
-
- no_box: bool = betterproto.bool_field(30)
- """
- Do not box this value in Option[T]. If set, this overrides
MessageOptions.no_box
- """
-
- required: bool = betterproto.bool_field(31)
- """
- Like no_box it does not box a value in Option[T], but also fails parsing
when a value
- is not provided. This enables to emulate required fields in proto3.
- """
-
-
-@dataclass(eq=False, repr=False)
-class EnumOptions(betterproto.Message):
- extends: List[str] = betterproto.string_field(1)
- """Additional classes and traits to mix in to the base trait"""
-
- companion_extends: List[str] = betterproto.string_field(2)
- """Additional classes and traits to mix in to the companion object."""
-
- type: str = betterproto.string_field(3)
- """
- All instances of this enum will be converted to this type. An implicit
TypeMapper
- must be present.
- """
-
- base_annotations: List[str] = betterproto.string_field(4)
- """Custom annotations to add to the generated enum's base class."""
-
- recognized_annotations: List[str] = betterproto.string_field(5)
- """Custom annotations to add to the generated trait."""
-
- unrecognized_annotations: List[str] = betterproto.string_field(6)
- """Custom annotations to add to the generated Unrecognized case class."""
-
-
-@dataclass(eq=False, repr=False)
-class EnumValueOptions(betterproto.Message):
- extends: List[str] = betterproto.string_field(1)
- """Additional classes and traits to mix in to an individual enum value."""
-
- scala_name: str = betterproto.string_field(2)
- """Name in Scala to use for this enum value."""
-
- annotations: List[str] = betterproto.string_field(3)
- """
- Custom annotations to add to the generated case object for this enum value.
- """
-
-
-@dataclass(eq=False, repr=False)
-class OneofOptions(betterproto.Message):
- extends: List[str] = betterproto.string_field(1)
- """Additional traits to mix in to a oneof."""
-
- scala_name: str = betterproto.string_field(2)
- """Name in Scala to use for this oneof field."""
-
-
-@dataclass(eq=False, repr=False)
-class FieldTransformation(betterproto.Message):
- when: "betterproto_lib_google_protobuf.FieldDescriptorProto" = (
- betterproto.message_field(1)
- )
- match_type: "MatchType" = betterproto.enum_field(2)
- set: "betterproto_lib_google_protobuf.FieldOptions" =
betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class PreprocessorOutput(betterproto.Message):
- options_by_file: Dict[str, "ScalaPbOptions"] = betterproto.map_field(
- 1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
- )
diff --git a/bin/computing-unit-master.dockerfile
b/bin/computing-unit-master.dockerfile
index 191d23f2dd..0d9d60b79f 100644
--- a/bin/computing-unit-master.dockerfile
+++ b/bin/computing-unit-master.dockerfile
@@ -36,15 +36,32 @@ COPY project/ project/
COPY build.sbt build.sbt
COPY .jvmopts .jvmopts
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
RUN apt-get update && apt-get install -y \
netcat \
unzip \
+ curl \
libpq-dev \
python3-minimal \
+ python3-pip \
&& apt-get clean
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before `sbt dist`.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+ && curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
\
+ && unzip -o /tmp/protoc.zip -d /usr/local \
+ && chmod +x /usr/local/bin/protoc \
+ && rm /tmp/protoc.zip \
+ && pip3 install --no-cache-dir -c amber/requirements.txt
'betterproto[compiler]' \
+ && bash bin/python-proto-gen.sh
+
# Add .git for runtime calls to jgit from OPversion
COPY .git .git
COPY LICENSE NOTICE DISCLAIMER ./
diff --git a/bin/computing-unit-worker.dockerfile
b/bin/computing-unit-worker.dockerfile
index 28d3b4cf0c..fc80998888 100644
--- a/bin/computing-unit-worker.dockerfile
+++ b/bin/computing-unit-worker.dockerfile
@@ -36,15 +36,32 @@ COPY project/ project/
COPY build.sbt build.sbt
COPY .jvmopts .jvmopts
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
RUN apt-get update && apt-get install -y \
netcat \
unzip \
+ curl \
libpq-dev \
python3-minimal \
+ python3-pip \
&& apt-get clean
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before `sbt dist`.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+ && curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
\
+ && unzip -o /tmp/protoc.zip -d /usr/local \
+ && chmod +x /usr/local/bin/protoc \
+ && rm /tmp/protoc.zip \
+ && pip3 install --no-cache-dir -c amber/requirements.txt
'betterproto[compiler]' \
+ && bash bin/python-proto-gen.sh
+
# Add .git for runtime calls to jgit from OPversion
COPY .git .git
COPY LICENSE NOTICE DISCLAIMER ./
diff --git a/bin/protoc-version.txt b/bin/protoc-version.txt
new file mode 100644
index 0000000000..de24deecf3
--- /dev/null
+++ b/bin/protoc-version.txt
@@ -0,0 +1 @@
+3.19.4
diff --git a/bin/python-proto-gen.sh b/bin/python-proto-gen.sh
index 0faf33eb9b..db51bb7626 100755
--- a/bin/python-proto-gen.sh
+++ b/bin/python-proto-gen.sh
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-# assuming inside the pytexera executing Python ENV
+set -euo pipefail
-# dirs
-TEXERA_HOME="$(git rev-parse --show-toplevel)"
+# Resolve repo root from this script's location (avoids git/CWD assumptions
+# so the script works inside Docker build stages before .git is copied).
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+TEXERA_HOME="$(cd "$SCRIPT_DIR/.." && pwd)"
AMBER_DIR="$TEXERA_HOME/amber"
PYAMBER_DIR="$AMBER_DIR/src/main/python"
PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf"
@@ -26,8 +28,12 @@ PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf"
CORE_DIR="$TEXERA_HOME/common/workflow-core"
PROTOBUF_CORE_DIR="$CORE_DIR/src/main/protobuf"
+PROTOC_INCLUDE_DIR="$(dirname "$(dirname "$(command -v protoc)")")/include"
+
# proto-gen
+mkdir -p "$PYAMBER_DIR/proto"
protoc --python_betterproto_out="$PYAMBER_DIR/proto" \
+ -I="$PROTOC_INCLUDE_DIR" \
-I="$PROTOBUF_AMBER_DIR" \
-I="$PROTOBUF_CORE_DIR" \
$(find "$PROTOBUF_AMBER_DIR" -iname "*.proto") \
diff --git a/bin/texera-web-application.dockerfile
b/bin/texera-web-application.dockerfile
index a829fb16aa..efaee5699a 100644
--- a/bin/texera-web-application.dockerfile
+++ b/bin/texera-web-application.dockerfile
@@ -50,15 +50,33 @@ COPY project/ project/
COPY build.sbt build.sbt
COPY .jvmopts .jvmopts
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
RUN apt-get update && apt-get install -y \
netcat \
unzip \
+ curl \
libpq-dev \
python3-minimal \
+ python3-pip \
&& apt-get clean
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before the WorkflowExecutionService dist
+# is packaged.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+ && curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
\
+ && unzip -o /tmp/protoc.zip -d /usr/local \
+ && chmod +x /usr/local/bin/protoc \
+ && rm /tmp/protoc.zip \
+ && pip3 install --no-cache-dir -c amber/requirements.txt
'betterproto[compiler]' \
+ && bash bin/python-proto-gen.sh
+
# Add .git for runtime calls to jgit from OPversion
COPY .git .git
COPY LICENSE NOTICE DISCLAIMER ./