This is an automated email from the ASF dual-hosted git repository.
shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/shengquan-add-reconfigration
by this push:
new 4e59c4d285 fix python proto
4e59c4d285 is described below
commit 4e59c4d285b28e0567e1041b9e55c3d594d873eb
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 19:56:52 2026 -0800
fix python proto
---
.../amber/engine/architecture/rpc/__init__.py | 1526 ++++++++++----------
.../apache/texera/amber/engine/common/__init__.py | 42 +-
2 files changed, 786 insertions(+), 782 deletions(-)
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index f7b89f13e6..b341f70081 100644
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -22,6 +22,7 @@ from .. import (
worker as _worker__,
)
+
if TYPE_CHECKING:
import grpclib.server
from betterproto.grpc.grpclib_client import MetadataLike
@@ -91,6 +92,9 @@ class ControlRequest(betterproto.Message):
link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
9, group="sealed_value"
)
+ workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
+ betterproto.message_field(10, group="sealed_value")
+ )
add_input_channel_request: "AddInputChannelRequest" =
betterproto.message_field(
50, group="sealed_value"
)
@@ -504,692 +508,692 @@ class WorkerMetricsResponse(betterproto.Message):
metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1)
-class ControllerServiceStub(betterproto.ServiceStub):
- async def retrieve_workflow_state(
+class RpcTesterStub(betterproto.ServiceStub):
+ async def send_ping(
self,
- empty_request: "EmptyRequest",
+ ping: "Ping",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "RetrieveWorkflowStateResponse":
+ ) -> "IntResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
- empty_request,
- RetrieveWorkflowStateResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
+ ping,
+ IntResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def propagate_embedded_control_message(
+ async def send_pong(
self,
- propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
+ pong: "Pong",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "PropagateEmbeddedControlMessageResponse":
+ ) -> "IntResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
- propagate_embedded_control_message_request,
- PropagateEmbeddedControlMessageResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
+ pong,
+ IntResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def take_global_checkpoint(
+ async def send_nested(
self,
- take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
+ nested: "Nested",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "TakeGlobalCheckpointResponse":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
- take_global_checkpoint_request,
- TakeGlobalCheckpointResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
+ nested,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def debug_command(
+ async def send_pass(
self,
- debug_command_request: "DebugCommandRequest",
+ pass_: "Pass",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
- debug_command_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
+ pass_,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def evaluate_python_expression(
+ async def send_error_command(
self,
- evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+ error_command: "ErrorCommand",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EvaluatePythonExpressionResponse":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
- evaluate_python_expression_request,
- EvaluatePythonExpressionResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
+ error_command,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def console_message_triggered(
+ async def send_recursion(
self,
- console_message_triggered_request: "ConsoleMessageTriggeredRequest",
+ recursion: "Recursion",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
- console_message_triggered_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
+ recursion,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def port_completed(
+ async def send_collect(
self,
- port_completed_request: "PortCompletedRequest",
+ collect: "Collect",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
- port_completed_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
+ collect,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def start_workflow(
+ async def send_generate_number(
self,
- empty_request: "EmptyRequest",
+ generate_number: "GenerateNumber",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StartWorkflowResponse":
+ ) -> "IntResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
- empty_request,
- StartWorkflowResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
+ generate_number,
+ IntResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def resume_workflow(
+ async def send_multi_call(
self,
- empty_request: "EmptyRequest",
+ multi_call: "MultiCall",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
- empty_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
+ multi_call,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def pause_workflow(
+ async def send_chain(
self,
- empty_request: "EmptyRequest",
+ chain: "Chain",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StringResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
- empty_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
+ chain,
+ StringResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def worker_state_updated(
+
+class WorkerServiceStub(betterproto.ServiceStub):
+ async def add_input_channel(
self,
- worker_state_updated_request: "WorkerStateUpdatedRequest",
+ add_input_channel_request: "AddInputChannelRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
- worker_state_updated_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
+ add_input_channel_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def worker_execution_completed(
+ async def add_partitioning(
self,
- empty_request: "EmptyRequest",
+ add_partitioning_request: "AddPartitioningRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
- empty_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
+ add_partitioning_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def link_workers(
+ async def assign_port(
self,
- link_workers_request: "LinkWorkersRequest",
+ assign_port_request: "AssignPortRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
- link_workers_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
+ assign_port_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def controller_initiate_query_statistics(
+ async def finalize_checkpoint(
self,
- query_statistics_request: "QueryStatisticsRequest",
+ finalize_checkpoint_request: "FinalizeCheckpointRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "FinalizeCheckpointResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
- query_statistics_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
+ finalize_checkpoint_request,
+ FinalizeCheckpointResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def retry_workflow(
+ async def flush_network_buffer(
self,
- retry_workflow_request: "RetryWorkflowRequest",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
- retry_workflow_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
+ empty_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def reconfigure_workflow(
+ async def initialize_executor(
self,
- workflow_reconfigure_request: "WorkflowReconfigureRequest",
+ initialize_executor_request: "InitializeExecutorRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
- workflow_reconfigure_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
+ initialize_executor_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
-
-class RpcTesterStub(betterproto.ServiceStub):
- async def send_ping(
+ async def open_executor(
self,
- ping: "Ping",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
- ping,
- IntResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_pong(
+ async def pause_worker(
self,
- pong: "Pong",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
+ ) -> "WorkerStateResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
- pong,
- IntResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
+ empty_request,
+ WorkerStateResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_nested(
+ async def prepare_checkpoint(
self,
- nested: "Nested",
+ prepare_checkpoint_request: "PrepareCheckpointRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
- nested,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
+ prepare_checkpoint_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_pass(
+ async def query_statistics(
self,
- pass_: "Pass",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "WorkerMetricsResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
- pass_,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
+ empty_request,
+ WorkerMetricsResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_error_command(
+ async def resume_worker(
self,
- error_command: "ErrorCommand",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "WorkerStateResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
- error_command,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
+ empty_request,
+ WorkerStateResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_recursion(
+ async def retrieve_state(
self,
- recursion: "Recursion",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
- recursion,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_collect(
+ async def retry_current_tuple(
self,
- collect: "Collect",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
- collect,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_generate_number(
+ async def start_worker(
self,
- generate_number: "GenerateNumber",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "IntResponse":
+ ) -> "WorkerStateResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
- generate_number,
- IntResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
+ empty_request,
+ WorkerStateResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_multi_call(
+ async def end_worker(
self,
- multi_call: "MultiCall",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
- multi_call,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def send_chain(
+ async def start_channel(
self,
- chain: "Chain",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "StringResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
- chain,
- StringResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
-
-class WorkerServiceStub(betterproto.ServiceStub):
- async def add_input_channel(
+ async def end_channel(
self,
- add_input_channel_request: "AddInputChannelRequest",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
- add_input_channel_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
+ empty_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def add_partitioning(
+ async def debug_command(
self,
- add_partitioning_request: "AddPartitioningRequest",
+ debug_command_request: "DebugCommandRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
- add_partitioning_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
+ debug_command_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def assign_port(
+ async def evaluate_python_expression(
self,
- assign_port_request: "AssignPortRequest",
+ evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "EvaluatedValue":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
- assign_port_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
+ evaluate_python_expression_request,
+ EvaluatedValue,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def finalize_checkpoint(
+ async def no_operation(
self,
- finalize_checkpoint_request: "FinalizeCheckpointRequest",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "FinalizeCheckpointResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
- finalize_checkpoint_request,
- FinalizeCheckpointResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
+ empty_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def flush_network_buffer(
+ async def update_executor(
self,
- empty_request: "EmptyRequest",
+ update_executor_request: "UpdateExecutorRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
- empty_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
+ update_executor_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def initialize_executor(
+
+class ControllerServiceStub(betterproto.ServiceStub):
+ async def retrieve_workflow_state(
self,
- initialize_executor_request: "InitializeExecutorRequest",
+ empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "RetrieveWorkflowStateResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
- initialize_executor_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
+ empty_request,
+ RetrieveWorkflowStateResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def open_executor(
+ async def propagate_embedded_control_message(
self,
- empty_request: "EmptyRequest",
+ propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "PropagateEmbeddedControlMessageResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
- empty_request,
- EmptyReturn,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
+ propagate_embedded_control_message_request,
+ PropagateEmbeddedControlMessageResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def pause_worker(
+ async def take_global_checkpoint(
self,
- empty_request: "EmptyRequest",
+ take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
+ ) -> "TakeGlobalCheckpointResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
- empty_request,
- WorkerStateResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
+ take_global_checkpoint_request,
+ TakeGlobalCheckpointResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def prepare_checkpoint(
+ async def debug_command(
self,
- prepare_checkpoint_request: "PrepareCheckpointRequest",
+ debug_command_request: "DebugCommandRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
- prepare_checkpoint_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
+ debug_command_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def query_statistics(
+ async def evaluate_python_expression(
self,
- empty_request: "EmptyRequest",
+ evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "WorkerMetricsResponse":
+ ) -> "EvaluatePythonExpressionResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
- empty_request,
- WorkerMetricsResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
+ evaluate_python_expression_request,
+ EvaluatePythonExpressionResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def resume_worker(
+ async def console_message_triggered(
self,
- empty_request: "EmptyRequest",
+ console_message_triggered_request: "ConsoleMessageTriggeredRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
- empty_request,
- WorkerStateResponse,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
+ console_message_triggered_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def retrieve_state(
+ async def port_completed(
self,
- empty_request: "EmptyRequest",
+ port_completed_request: "PortCompletedRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
- empty_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
+ port_completed_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def retry_current_tuple(
+ async def start_workflow(
self,
empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EmptyReturn":
+ ) -> "StartWorkflowResponse":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
empty_request,
- EmptyReturn,
+ StartWorkflowResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def start_worker(
+ async def resume_workflow(
self,
empty_request: "EmptyRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "WorkerStateResponse":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
empty_request,
- WorkerStateResponse,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def end_worker(
+ async def pause_workflow(
self,
empty_request: "EmptyRequest",
*,
@@ -1198,7 +1202,7 @@ class WorkerServiceStub(betterproto.ServiceStub):
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
empty_request,
EmptyReturn,
timeout=timeout,
@@ -1206,24 +1210,24 @@ class WorkerServiceStub(betterproto.ServiceStub):
metadata=metadata,
)
- async def start_channel(
+ async def worker_state_updated(
self,
- empty_request: "EmptyRequest",
+ worker_state_updated_request: "WorkerStateUpdatedRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
- empty_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
+ worker_state_updated_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def end_channel(
+ async def worker_execution_completed(
self,
empty_request: "EmptyRequest",
*,
@@ -1232,7 +1236,7 @@ class WorkerServiceStub(betterproto.ServiceStub):
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
empty_request,
EmptyReturn,
timeout=timeout,
@@ -1240,68 +1244,68 @@ class WorkerServiceStub(betterproto.ServiceStub):
metadata=metadata,
)
- async def debug_command(
+ async def link_workers(
self,
- debug_command_request: "DebugCommandRequest",
+ link_workers_request: "LinkWorkersRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
- debug_command_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
+ link_workers_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def evaluate_python_expression(
+ async def controller_initiate_query_statistics(
self,
- evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+ query_statistics_request: "QueryStatisticsRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
- ) -> "EvaluatedValue":
+ ) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
- evaluate_python_expression_request,
- EvaluatedValue,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
+ query_statistics_request,
+ EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def no_operation(
+ async def retry_workflow(
self,
- empty_request: "EmptyRequest",
+ retry_workflow_request: "RetryWorkflowRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
- empty_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
+ retry_workflow_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
- async def update_executor(
+ async def reconfigure_workflow(
self,
- update_executor_request: "UpdateExecutorRequest",
+ workflow_reconfigure_request: "WorkflowReconfigureRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None
) -> "EmptyReturn":
return await self._unary_unary(
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
- update_executor_request,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
+ workflow_reconfigure_request,
EmptyReturn,
timeout=timeout,
deadline=deadline,
@@ -1309,842 +1313,842 @@ class WorkerServiceStub(betterproto.ServiceStub):
)
-class ControllerServiceBase(ServiceBase):
+class RpcTesterBase(ServiceBase):
- async def retrieve_workflow_state(
- self, empty_request: "EmptyRequest"
- ) -> "RetrieveWorkflowStateResponse":
+ async def send_ping(self, ping: "Ping") -> "IntResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def propagate_embedded_control_message(
- self,
- propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
- ) -> "PropagateEmbeddedControlMessageResponse":
+ async def send_pong(self, pong: "Pong") -> "IntResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def take_global_checkpoint(
- self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
- ) -> "TakeGlobalCheckpointResponse":
+ async def send_nested(self, nested: "Nested") -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def debug_command(
- self, debug_command_request: "DebugCommandRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def evaluate_python_expression(
- self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
- ) -> "EvaluatePythonExpressionResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def console_message_triggered(
- self, console_message_triggered_request:
"ConsoleMessageTriggeredRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def port_completed(
- self, port_completed_request: "PortCompletedRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def start_workflow(
- self, empty_request: "EmptyRequest"
- ) -> "StartWorkflowResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def resume_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def pause_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ async def send_pass(self, pass_: "Pass") -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def worker_state_updated(
- self, worker_state_updated_request: "WorkerStateUpdatedRequest"
- ) -> "EmptyReturn":
+ async def send_error_command(
+ self, error_command: "ErrorCommand"
+ ) -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def worker_execution_completed(
- self, empty_request: "EmptyRequest"
- ) -> "EmptyReturn":
+ async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def link_workers(
- self, link_workers_request: "LinkWorkersRequest"
- ) -> "EmptyReturn":
+ async def send_collect(self, collect: "Collect") -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def controller_initiate_query_statistics(
- self, query_statistics_request: "QueryStatisticsRequest"
- ) -> "EmptyReturn":
+ async def send_generate_number(
+ self, generate_number: "GenerateNumber"
+ ) -> "IntResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def retry_workflow(
- self, retry_workflow_request: "RetryWorkflowRequest"
- ) -> "EmptyReturn":
+ async def send_multi_call(self, multi_call: "MultiCall") ->
"StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def reconfigure_workflow(
- self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
- ) -> "EmptyReturn":
+ async def send_chain(self, chain: "Chain") -> "StringResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_retrieve_workflow_state(
- self,
- stream: "grpclib.server.Stream[EmptyRequest,
RetrieveWorkflowStateResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.retrieve_workflow_state(request)
- await stream.send_message(response)
-
- async def __rpc_propagate_embedded_control_message(
- self,
- stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest,
PropagateEmbeddedControlMessageResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.propagate_embedded_control_message(request)
- await stream.send_message(response)
-
- async def __rpc_take_global_checkpoint(
- self,
- stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest,
TakeGlobalCheckpointResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.take_global_checkpoint(request)
- await stream.send_message(response)
-
- async def __rpc_debug_command(
- self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.debug_command(request)
- await stream.send_message(response)
-
- async def __rpc_evaluate_python_expression(
- self,
- stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatePythonExpressionResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.evaluate_python_expression(request)
- await stream.send_message(response)
-
- async def __rpc_console_message_triggered(
- self,
- stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest,
EmptyReturn]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.console_message_triggered(request)
- await stream.send_message(response)
-
- async def __rpc_port_completed(
- self, stream: "grpclib.server.Stream[PortCompletedRequest,
EmptyReturn]"
+ async def __rpc_send_ping(
+ self, stream: "grpclib.server.Stream[Ping, IntResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.port_completed(request)
+ response = await self.send_ping(request)
await stream.send_message(response)
- async def __rpc_start_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest,
StartWorkflowResponse]"
+ async def __rpc_send_pong(
+ self, stream: "grpclib.server.Stream[Pong, IntResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.start_workflow(request)
+ response = await self.send_pong(request)
await stream.send_message(response)
- async def __rpc_resume_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_send_nested(
+ self, stream: "grpclib.server.Stream[Nested, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.resume_workflow(request)
+ response = await self.send_nested(request)
await stream.send_message(response)
- async def __rpc_pause_workflow(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_send_pass(
+ self, stream: "grpclib.server.Stream[Pass, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.pause_workflow(request)
+ response = await self.send_pass(request)
await stream.send_message(response)
- async def __rpc_worker_state_updated(
- self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest,
EmptyReturn]"
+ async def __rpc_send_error_command(
+ self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.worker_state_updated(request)
+ response = await self.send_error_command(request)
await stream.send_message(response)
- async def __rpc_worker_execution_completed(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_send_recursion(
+ self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.worker_execution_completed(request)
+ response = await self.send_recursion(request)
await stream.send_message(response)
- async def __rpc_link_workers(
- self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
+ async def __rpc_send_collect(
+ self, stream: "grpclib.server.Stream[Collect, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.link_workers(request)
+ response = await self.send_collect(request)
await stream.send_message(response)
- async def __rpc_controller_initiate_query_statistics(
- self, stream: "grpclib.server.Stream[QueryStatisticsRequest,
EmptyReturn]"
+ async def __rpc_send_generate_number(
+ self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.controller_initiate_query_statistics(request)
+ response = await self.send_generate_number(request)
await stream.send_message(response)
- async def __rpc_retry_workflow(
- self, stream: "grpclib.server.Stream[RetryWorkflowRequest,
EmptyReturn]"
+ async def __rpc_send_multi_call(
+ self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.retry_workflow(request)
+ response = await self.send_multi_call(request)
await stream.send_message(response)
- async def __rpc_reconfigure_workflow(
- self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest,
EmptyReturn]"
+ async def __rpc_send_chain(
+ self, stream: "grpclib.server.Stream[Chain, StringResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.reconfigure_workflow(request)
+ response = await self.send_chain(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
grpclib.const.Handler(
- self.__rpc_retrieve_workflow_state,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- RetrieveWorkflowStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
grpclib.const.Handler(
- self.__rpc_propagate_embedded_control_message,
- grpclib.const.Cardinality.UNARY_UNARY,
- PropagateEmbeddedControlMessageRequest,
- PropagateEmbeddedControlMessageResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
grpclib.const.Handler(
- self.__rpc_take_global_checkpoint,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing":
grpclib.const.Handler(
+ self.__rpc_send_ping,
grpclib.const.Cardinality.UNARY_UNARY,
- TakeGlobalCheckpointRequest,
- TakeGlobalCheckpointResponse,
+ Ping,
+ IntResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
grpclib.const.Handler(
- self.__rpc_debug_command,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong":
grpclib.const.Handler(
+ self.__rpc_send_pong,
grpclib.const.Cardinality.UNARY_UNARY,
- DebugCommandRequest,
- EmptyReturn,
+ Pong,
+ IntResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
grpclib.const.Handler(
- self.__rpc_evaluate_python_expression,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested":
grpclib.const.Handler(
+ self.__rpc_send_nested,
grpclib.const.Cardinality.UNARY_UNARY,
- EvaluatePythonExpressionRequest,
- EvaluatePythonExpressionResponse,
+ Nested,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
grpclib.const.Handler(
- self.__rpc_console_message_triggered,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass":
grpclib.const.Handler(
+ self.__rpc_send_pass,
grpclib.const.Cardinality.UNARY_UNARY,
- ConsoleMessageTriggeredRequest,
- EmptyReturn,
+ Pass,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
grpclib.const.Handler(
- self.__rpc_port_completed,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand":
grpclib.const.Handler(
+ self.__rpc_send_error_command,
grpclib.const.Cardinality.UNARY_UNARY,
- PortCompletedRequest,
- EmptyReturn,
+ ErrorCommand,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
grpclib.const.Handler(
- self.__rpc_start_workflow,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion":
grpclib.const.Handler(
+ self.__rpc_send_recursion,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- StartWorkflowResponse,
+ Recursion,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
grpclib.const.Handler(
- self.__rpc_resume_workflow,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect":
grpclib.const.Handler(
+ self.__rpc_send_collect,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
+ Collect,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
grpclib.const.Handler(
- self.__rpc_pause_workflow,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
grpclib.const.Handler(
+ self.__rpc_send_generate_number,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
+ GenerateNumber,
+ IntResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
grpclib.const.Handler(
- self.__rpc_worker_state_updated,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall":
grpclib.const.Handler(
+ self.__rpc_send_multi_call,
grpclib.const.Cardinality.UNARY_UNARY,
- WorkerStateUpdatedRequest,
- EmptyReturn,
+ MultiCall,
+ StringResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
grpclib.const.Handler(
- self.__rpc_worker_execution_completed,
+
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain":
grpclib.const.Handler(
+ self.__rpc_send_chain,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
grpclib.const.Handler(
- self.__rpc_link_workers,
- grpclib.const.Cardinality.UNARY_UNARY,
- LinkWorkersRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
grpclib.const.Handler(
- self.__rpc_controller_initiate_query_statistics,
- grpclib.const.Cardinality.UNARY_UNARY,
- QueryStatisticsRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
grpclib.const.Handler(
- self.__rpc_retry_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- RetryWorkflowRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
grpclib.const.Handler(
- self.__rpc_reconfigure_workflow,
- grpclib.const.Cardinality.UNARY_UNARY,
- WorkflowReconfigureRequest,
- EmptyReturn,
+ Chain,
+ StringResponse,
),
}
-class RpcTesterBase(ServiceBase):
+class WorkerServiceBase(ServiceBase):
- async def send_ping(self, ping: "Ping") -> "IntResponse":
+ async def add_input_channel(
+ self, add_input_channel_request: "AddInputChannelRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_pong(self, pong: "Pong") -> "IntResponse":
+ async def add_partitioning(
+ self, add_partitioning_request: "AddPartitioningRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_nested(self, nested: "Nested") -> "StringResponse":
+ async def assign_port(
+ self, assign_port_request: "AssignPortRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_pass(self, pass_: "Pass") -> "StringResponse":
+ async def finalize_checkpoint(
+ self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
+ ) -> "FinalizeCheckpointResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_error_command(
- self, error_command: "ErrorCommand"
- ) -> "StringResponse":
+ async def flush_network_buffer(
+ self, empty_request: "EmptyRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
+ async def initialize_executor(
+ self, initialize_executor_request: "InitializeExecutorRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_collect(self, collect: "Collect") -> "StringResponse":
+ async def open_executor(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_generate_number(
- self, generate_number: "GenerateNumber"
- ) -> "IntResponse":
+ async def pause_worker(
+ self, empty_request: "EmptyRequest"
+ ) -> "WorkerStateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_multi_call(self, multi_call: "MultiCall") ->
"StringResponse":
+ async def prepare_checkpoint(
+ self, prepare_checkpoint_request: "PrepareCheckpointRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def send_chain(self, chain: "Chain") -> "StringResponse":
+ async def query_statistics(
+ self, empty_request: "EmptyRequest"
+ ) -> "WorkerMetricsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_send_ping(
- self, stream: "grpclib.server.Stream[Ping, IntResponse]"
+ async def resume_worker(
+ self, empty_request: "EmptyRequest"
+ ) -> "WorkerStateResponse":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def retrieve_state(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def retry_current_tuple(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def start_worker(
+ self, empty_request: "EmptyRequest"
+ ) -> "WorkerStateResponse":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def start_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def end_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def debug_command(
+ self, debug_command_request: "DebugCommandRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def evaluate_python_expression(
+ self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
+ ) -> "EvaluatedValue":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def no_operation(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def update_executor(
+ self, update_executor_request: "UpdateExecutorRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+ async def __rpc_add_input_channel(
+ self, stream: "grpclib.server.Stream[AddInputChannelRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_ping(request)
+ response = await self.add_input_channel(request)
await stream.send_message(response)
- async def __rpc_send_pong(
- self, stream: "grpclib.server.Stream[Pong, IntResponse]"
+ async def __rpc_add_partitioning(
+ self, stream: "grpclib.server.Stream[AddPartitioningRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_pong(request)
+ response = await self.add_partitioning(request)
await stream.send_message(response)
- async def __rpc_send_nested(
- self, stream: "grpclib.server.Stream[Nested, StringResponse]"
+ async def __rpc_assign_port(
+ self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_nested(request)
+ response = await self.assign_port(request)
await stream.send_message(response)
- async def __rpc_send_pass(
- self, stream: "grpclib.server.Stream[Pass, StringResponse]"
+ async def __rpc_finalize_checkpoint(
+ self,
+ stream: "grpclib.server.Stream[FinalizeCheckpointRequest,
FinalizeCheckpointResponse]",
) -> None:
request = await stream.recv_message()
- response = await self.send_pass(request)
+ response = await self.finalize_checkpoint(request)
await stream.send_message(response)
- async def __rpc_send_error_command(
- self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
+ async def __rpc_flush_network_buffer(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_error_command(request)
+ response = await self.flush_network_buffer(request)
await stream.send_message(response)
- async def __rpc_send_recursion(
- self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
+ async def __rpc_initialize_executor(
+ self, stream: "grpclib.server.Stream[InitializeExecutorRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_recursion(request)
+ response = await self.initialize_executor(request)
await stream.send_message(response)
- async def __rpc_send_collect(
- self, stream: "grpclib.server.Stream[Collect, StringResponse]"
+ async def __rpc_open_executor(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_collect(request)
+ response = await self.open_executor(request)
await stream.send_message(response)
- async def __rpc_send_generate_number(
- self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
+ async def __rpc_pause_worker(
+ self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.send_generate_number(request)
+ response = await self.pause_worker(request)
await stream.send_message(response)
- async def __rpc_send_multi_call(
- self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
+ async def __rpc_prepare_checkpoint(
+ self, stream: "grpclib.server.Stream[PrepareCheckpointRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.send_multi_call(request)
+ response = await self.prepare_checkpoint(request)
await stream.send_message(response)
- async def __rpc_send_chain(
- self, stream: "grpclib.server.Stream[Chain, StringResponse]"
+ async def __rpc_query_statistics(
+ self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerMetricsResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.send_chain(request)
+ response = await self.query_statistics(request)
+ await stream.send_message(response)
+
+ async def __rpc_resume_worker(
+ self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.resume_worker(request)
+ await stream.send_message(response)
+
+ async def __rpc_retrieve_state(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.retrieve_state(request)
+ await stream.send_message(response)
+
+ async def __rpc_retry_current_tuple(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.retry_current_tuple(request)
+ await stream.send_message(response)
+
+ async def __rpc_start_worker(
+ self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.start_worker(request)
+ await stream.send_message(response)
+
+ async def __rpc_end_worker(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.end_worker(request)
+ await stream.send_message(response)
+
+ async def __rpc_start_channel(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.start_channel(request)
+ await stream.send_message(response)
+
+ async def __rpc_end_channel(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.end_channel(request)
+ await stream.send_message(response)
+
+ async def __rpc_debug_command(
+ self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.debug_command(request)
+ await stream.send_message(response)
+
+ async def __rpc_evaluate_python_expression(
+ self,
+ stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatedValue]",
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.evaluate_python_expression(request)
+ await stream.send_message(response)
+
+ async def __rpc_no_operation(
+ self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.no_operation(request)
+ await stream.send_message(response)
+
+ async def __rpc_update_executor(
+ self, stream: "grpclib.server.Stream[UpdateExecutorRequest,
EmptyReturn]"
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.update_executor(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing":
grpclib.const.Handler(
- self.__rpc_send_ping,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
grpclib.const.Handler(
+ self.__rpc_add_input_channel,
grpclib.const.Cardinality.UNARY_UNARY,
- Ping,
- IntResponse,
+ AddInputChannelRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong":
grpclib.const.Handler(
- self.__rpc_send_pong,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
grpclib.const.Handler(
+ self.__rpc_add_partitioning,
grpclib.const.Cardinality.UNARY_UNARY,
- Pong,
- IntResponse,
+ AddPartitioningRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested":
grpclib.const.Handler(
- self.__rpc_send_nested,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort":
grpclib.const.Handler(
+ self.__rpc_assign_port,
grpclib.const.Cardinality.UNARY_UNARY,
- Nested,
- StringResponse,
+ AssignPortRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass":
grpclib.const.Handler(
- self.__rpc_send_pass,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
grpclib.const.Handler(
+ self.__rpc_finalize_checkpoint,
grpclib.const.Cardinality.UNARY_UNARY,
- Pass,
- StringResponse,
+ FinalizeCheckpointRequest,
+ FinalizeCheckpointResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand":
grpclib.const.Handler(
- self.__rpc_send_error_command,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
grpclib.const.Handler(
+ self.__rpc_flush_network_buffer,
grpclib.const.Cardinality.UNARY_UNARY,
- ErrorCommand,
- StringResponse,
+ EmptyRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion":
grpclib.const.Handler(
- self.__rpc_send_recursion,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
grpclib.const.Handler(
+ self.__rpc_initialize_executor,
grpclib.const.Cardinality.UNARY_UNARY,
- Recursion,
- StringResponse,
+ InitializeExecutorRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect":
grpclib.const.Handler(
- self.__rpc_send_collect,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor":
grpclib.const.Handler(
+ self.__rpc_open_executor,
grpclib.const.Cardinality.UNARY_UNARY,
- Collect,
- StringResponse,
+ EmptyRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
grpclib.const.Handler(
- self.__rpc_send_generate_number,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker":
grpclib.const.Handler(
+ self.__rpc_pause_worker,
grpclib.const.Cardinality.UNARY_UNARY,
- GenerateNumber,
- IntResponse,
+ EmptyRequest,
+ WorkerStateResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall":
grpclib.const.Handler(
- self.__rpc_send_multi_call,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
grpclib.const.Handler(
+ self.__rpc_prepare_checkpoint,
grpclib.const.Cardinality.UNARY_UNARY,
- MultiCall,
- StringResponse,
+ PrepareCheckpointRequest,
+ EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain":
grpclib.const.Handler(
- self.__rpc_send_chain,
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
grpclib.const.Handler(
+ self.__rpc_query_statistics,
grpclib.const.Cardinality.UNARY_UNARY,
- Chain,
- StringResponse,
+ EmptyRequest,
+ WorkerMetricsResponse,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker":
grpclib.const.Handler(
+ self.__rpc_resume_worker,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ WorkerStateResponse,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState":
grpclib.const.Handler(
+ self.__rpc_retrieve_state,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
grpclib.const.Handler(
+ self.__rpc_retry_current_tuple,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker":
grpclib.const.Handler(
+ self.__rpc_start_worker,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ WorkerStateResponse,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker":
grpclib.const.Handler(
+ self.__rpc_end_worker,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel":
grpclib.const.Handler(
+ self.__rpc_start_channel,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel":
grpclib.const.Handler(
+ self.__rpc_end_channel,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand":
grpclib.const.Handler(
+ self.__rpc_debug_command,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ DebugCommandRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
grpclib.const.Handler(
+ self.__rpc_evaluate_python_expression,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EvaluatePythonExpressionRequest,
+ EvaluatedValue,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation":
grpclib.const.Handler(
+ self.__rpc_no_operation,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ EmptyRequest,
+ EmptyReturn,
+ ),
+
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
grpclib.const.Handler(
+ self.__rpc_update_executor,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ UpdateExecutorRequest,
+ EmptyReturn,
),
}
-class WorkerServiceBase(ServiceBase):
-
- async def add_input_channel(
- self, add_input_channel_request: "AddInputChannelRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def add_partitioning(
- self, add_partitioning_request: "AddPartitioningRequest"
- ) -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+class ControllerServiceBase(ServiceBase):
- async def assign_port(
- self, assign_port_request: "AssignPortRequest"
- ) -> "EmptyReturn":
+ async def retrieve_workflow_state(
+ self, empty_request: "EmptyRequest"
+ ) -> "RetrieveWorkflowStateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def finalize_checkpoint(
- self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
- ) -> "FinalizeCheckpointResponse":
+ async def propagate_embedded_control_message(
+ self,
+ propagate_embedded_control_message_request:
"PropagateEmbeddedControlMessageRequest",
+ ) -> "PropagateEmbeddedControlMessageResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def flush_network_buffer(
- self, empty_request: "EmptyRequest"
- ) -> "EmptyReturn":
+ async def take_global_checkpoint(
+ self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
+ ) -> "TakeGlobalCheckpointResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def initialize_executor(
- self, initialize_executor_request: "InitializeExecutorRequest"
+ async def debug_command(
+ self, debug_command_request: "DebugCommandRequest"
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def open_executor(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ async def evaluate_python_expression(
+ self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
+ ) -> "EvaluatePythonExpressionResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def pause_worker(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
+ async def console_message_triggered(
+ self, console_message_triggered_request:
"ConsoleMessageTriggeredRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def prepare_checkpoint(
- self, prepare_checkpoint_request: "PrepareCheckpointRequest"
+ async def port_completed(
+ self, port_completed_request: "PortCompletedRequest"
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def query_statistics(
+ async def start_workflow(
self, empty_request: "EmptyRequest"
- ) -> "WorkerMetricsResponse":
+ ) -> "StartWorkflowResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def resume_worker(
- self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
+ async def resume_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def retrieve_state(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ async def pause_workflow(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def retry_current_tuple(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
+ async def worker_state_updated(
+ self, worker_state_updated_request: "WorkerStateUpdatedRequest"
+ ) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def start_worker(
+ async def worker_execution_completed(
self, empty_request: "EmptyRequest"
- ) -> "WorkerStateResponse":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def start_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def end_channel(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def debug_command(
- self, debug_command_request: "DebugCommandRequest"
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def evaluate_python_expression(
- self, evaluate_python_expression_request:
"EvaluatePythonExpressionRequest"
- ) -> "EvaluatedValue":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def no_operation(self, empty_request: "EmptyRequest") ->
"EmptyReturn":
- raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
- async def update_executor(
- self, update_executor_request: "UpdateExecutorRequest"
+ async def link_workers(
+ self, link_workers_request: "LinkWorkersRequest"
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_add_input_channel(
- self, stream: "grpclib.server.Stream[AddInputChannelRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.add_input_channel(request)
- await stream.send_message(response)
-
- async def __rpc_add_partitioning(
- self, stream: "grpclib.server.Stream[AddPartitioningRequest,
EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.add_partitioning(request)
- await stream.send_message(response)
+ async def controller_initiate_query_statistics(
+ self, query_statistics_request: "QueryStatisticsRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_assign_port(
- self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.assign_port(request)
- await stream.send_message(response)
+ async def retry_workflow(
+ self, retry_workflow_request: "RetryWorkflowRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_finalize_checkpoint(
- self,
- stream: "grpclib.server.Stream[FinalizeCheckpointRequest,
FinalizeCheckpointResponse]",
- ) -> None:
- request = await stream.recv_message()
- response = await self.finalize_checkpoint(request)
- await stream.send_message(response)
+ async def reconfigure_workflow(
+ self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
- async def __rpc_flush_network_buffer(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_retrieve_workflow_state(
+ self,
+ stream: "grpclib.server.Stream[EmptyRequest,
RetrieveWorkflowStateResponse]",
) -> None:
request = await stream.recv_message()
- response = await self.flush_network_buffer(request)
+ response = await self.retrieve_workflow_state(request)
await stream.send_message(response)
- async def __rpc_initialize_executor(
- self, stream: "grpclib.server.Stream[InitializeExecutorRequest,
EmptyReturn]"
+ async def __rpc_propagate_embedded_control_message(
+ self,
+ stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest,
PropagateEmbeddedControlMessageResponse]",
) -> None:
request = await stream.recv_message()
- response = await self.initialize_executor(request)
+ response = await self.propagate_embedded_control_message(request)
await stream.send_message(response)
- async def __rpc_open_executor(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_take_global_checkpoint(
+ self,
+ stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest,
TakeGlobalCheckpointResponse]",
) -> None:
request = await stream.recv_message()
- response = await self.open_executor(request)
+ response = await self.take_global_checkpoint(request)
await stream.send_message(response)
- async def __rpc_pause_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
+ async def __rpc_debug_command(
+ self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.pause_worker(request)
+ response = await self.debug_command(request)
await stream.send_message(response)
- async def __rpc_prepare_checkpoint(
- self, stream: "grpclib.server.Stream[PrepareCheckpointRequest,
EmptyReturn]"
+ async def __rpc_evaluate_python_expression(
+ self,
+ stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatePythonExpressionResponse]",
) -> None:
request = await stream.recv_message()
- response = await self.prepare_checkpoint(request)
+ response = await self.evaluate_python_expression(request)
await stream.send_message(response)
- async def __rpc_query_statistics(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerMetricsResponse]"
+ async def __rpc_console_message_triggered(
+ self,
+ stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest,
EmptyReturn]",
) -> None:
request = await stream.recv_message()
- response = await self.query_statistics(request)
+ response = await self.console_message_triggered(request)
await stream.send_message(response)
- async def __rpc_resume_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
+ async def __rpc_port_completed(
+ self, stream: "grpclib.server.Stream[PortCompletedRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.resume_worker(request)
+ response = await self.port_completed(request)
await stream.send_message(response)
- async def __rpc_retrieve_state(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_start_workflow(
+ self, stream: "grpclib.server.Stream[EmptyRequest,
StartWorkflowResponse]"
) -> None:
request = await stream.recv_message()
- response = await self.retrieve_state(request)
+ response = await self.start_workflow(request)
await stream.send_message(response)
- async def __rpc_retry_current_tuple(
+ async def __rpc_resume_workflow(
self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.retry_current_tuple(request)
- await stream.send_message(response)
-
- async def __rpc_start_worker(
- self, stream: "grpclib.server.Stream[EmptyRequest,
WorkerStateResponse]"
- ) -> None:
- request = await stream.recv_message()
- response = await self.start_worker(request)
+ response = await self.resume_workflow(request)
await stream.send_message(response)
- async def __rpc_end_worker(
+ async def __rpc_pause_workflow(
self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.end_worker(request)
+ response = await self.pause_workflow(request)
await stream.send_message(response)
- async def __rpc_start_channel(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_worker_state_updated(
+ self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.start_channel(request)
+ response = await self.worker_state_updated(request)
await stream.send_message(response)
- async def __rpc_end_channel(
+ async def __rpc_worker_execution_completed(
self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.end_channel(request)
+ response = await self.worker_execution_completed(request)
await stream.send_message(response)
- async def __rpc_debug_command(
- self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+ async def __rpc_link_workers(
+ self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.debug_command(request)
+ response = await self.link_workers(request)
await stream.send_message(response)
- async def __rpc_evaluate_python_expression(
- self,
- stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest,
EvaluatedValue]",
+ async def __rpc_controller_initiate_query_statistics(
+ self, stream: "grpclib.server.Stream[QueryStatisticsRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.evaluate_python_expression(request)
+ response = await self.controller_initiate_query_statistics(request)
await stream.send_message(response)
- async def __rpc_no_operation(
- self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+ async def __rpc_retry_workflow(
+ self, stream: "grpclib.server.Stream[RetryWorkflowRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.no_operation(request)
+ response = await self.retry_workflow(request)
await stream.send_message(response)
- async def __rpc_update_executor(
- self, stream: "grpclib.server.Stream[UpdateExecutorRequest,
EmptyReturn]"
+ async def __rpc_reconfigure_workflow(
+ self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest,
EmptyReturn]"
) -> None:
request = await stream.recv_message()
- response = await self.update_executor(request)
+ response = await self.reconfigure_workflow(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
grpclib.const.Handler(
- self.__rpc_add_input_channel,
- grpclib.const.Cardinality.UNARY_UNARY,
- AddInputChannelRequest,
- EmptyReturn,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
grpclib.const.Handler(
- self.__rpc_add_partitioning,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
grpclib.const.Handler(
+ self.__rpc_retrieve_workflow_state,
grpclib.const.Cardinality.UNARY_UNARY,
- AddPartitioningRequest,
- EmptyReturn,
+ EmptyRequest,
+ RetrieveWorkflowStateResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort":
grpclib.const.Handler(
- self.__rpc_assign_port,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
grpclib.const.Handler(
+ self.__rpc_propagate_embedded_control_message,
grpclib.const.Cardinality.UNARY_UNARY,
- AssignPortRequest,
- EmptyReturn,
+ PropagateEmbeddedControlMessageRequest,
+ PropagateEmbeddedControlMessageResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
grpclib.const.Handler(
- self.__rpc_finalize_checkpoint,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
grpclib.const.Handler(
+ self.__rpc_take_global_checkpoint,
grpclib.const.Cardinality.UNARY_UNARY,
- FinalizeCheckpointRequest,
- FinalizeCheckpointResponse,
+ TakeGlobalCheckpointRequest,
+ TakeGlobalCheckpointResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
grpclib.const.Handler(
- self.__rpc_flush_network_buffer,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
grpclib.const.Handler(
+ self.__rpc_debug_command,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
+ DebugCommandRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
grpclib.const.Handler(
- self.__rpc_initialize_executor,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
grpclib.const.Handler(
+ self.__rpc_evaluate_python_expression,
grpclib.const.Cardinality.UNARY_UNARY,
- InitializeExecutorRequest,
- EmptyReturn,
+ EvaluatePythonExpressionRequest,
+ EvaluatePythonExpressionResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor":
grpclib.const.Handler(
- self.__rpc_open_executor,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
grpclib.const.Handler(
+ self.__rpc_console_message_triggered,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
+ ConsoleMessageTriggeredRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker":
grpclib.const.Handler(
- self.__rpc_pause_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
grpclib.const.Handler(
- self.__rpc_prepare_checkpoint,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
grpclib.const.Handler(
+ self.__rpc_port_completed,
grpclib.const.Cardinality.UNARY_UNARY,
- PrepareCheckpointRequest,
+ PortCompletedRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
grpclib.const.Handler(
- self.__rpc_query_statistics,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerMetricsResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker":
grpclib.const.Handler(
- self.__rpc_resume_worker,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
grpclib.const.Handler(
+ self.__rpc_start_workflow,
grpclib.const.Cardinality.UNARY_UNARY,
EmptyRequest,
- WorkerStateResponse,
+ StartWorkflowResponse,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState":
grpclib.const.Handler(
- self.__rpc_retrieve_state,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
grpclib.const.Handler(
+ self.__rpc_resume_workflow,
grpclib.const.Cardinality.UNARY_UNARY,
EmptyRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
grpclib.const.Handler(
- self.__rpc_retry_current_tuple,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
grpclib.const.Handler(
+ self.__rpc_pause_workflow,
grpclib.const.Cardinality.UNARY_UNARY,
EmptyRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker":
grpclib.const.Handler(
- self.__rpc_start_worker,
- grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
- WorkerStateResponse,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker":
grpclib.const.Handler(
- self.__rpc_end_worker,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
grpclib.const.Handler(
+ self.__rpc_worker_state_updated,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
+ WorkerStateUpdatedRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel":
grpclib.const.Handler(
- self.__rpc_start_channel,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
grpclib.const.Handler(
+ self.__rpc_worker_execution_completed,
grpclib.const.Cardinality.UNARY_UNARY,
EmptyRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel":
grpclib.const.Handler(
- self.__rpc_end_channel,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
grpclib.const.Handler(
+ self.__rpc_link_workers,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
+ LinkWorkersRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand":
grpclib.const.Handler(
- self.__rpc_debug_command,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
grpclib.const.Handler(
+ self.__rpc_controller_initiate_query_statistics,
grpclib.const.Cardinality.UNARY_UNARY,
- DebugCommandRequest,
+ QueryStatisticsRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
grpclib.const.Handler(
- self.__rpc_evaluate_python_expression,
- grpclib.const.Cardinality.UNARY_UNARY,
- EvaluatePythonExpressionRequest,
- EvaluatedValue,
- ),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation":
grpclib.const.Handler(
- self.__rpc_no_operation,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
grpclib.const.Handler(
+ self.__rpc_retry_workflow,
grpclib.const.Cardinality.UNARY_UNARY,
- EmptyRequest,
+ RetryWorkflowRequest,
EmptyReturn,
),
-
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
grpclib.const.Handler(
- self.__rpc_update_executor,
+
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
grpclib.const.Handler(
+ self.__rpc_reconfigure_workflow,
grpclib.const.Cardinality.UNARY_UNARY,
- UpdateExecutorRequest,
+ WorkflowReconfigureRequest,
EmptyReturn,
),
}
diff --git
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
index 8c1464cc76..55c789aa39 100644
---
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
+++
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
@@ -18,27 +18,6 @@ from ..architecture import (
)
-@dataclass(eq=False, repr=False)
-class Backpressure(betterproto.Message):
- enable_backpressure: bool = betterproto.bool_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class CreditUpdate(betterproto.Message):
- pass
-
-
-@dataclass(eq=False, repr=False)
-class ActorCommand(betterproto.Message):
- backpressure: "Backpressure" = betterproto.message_field(1,
group="sealed_value")
- credit_update: "CreditUpdate" = betterproto.message_field(2,
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class PythonActorMessage(betterproto.Message):
- payload: "ActorCommand" = betterproto.message_field(1)
-
-
@dataclass(eq=False, repr=False)
class DirectControlMessagePayloadV2(betterproto.Message):
control_invocation: "_architecture_rpc__.ControlInvocation" = (
@@ -154,3 +133,24 @@ class ExecutionMetadataStore(betterproto.Message):
fatal_errors: List["__core__.WorkflowFatalError"] =
betterproto.message_field(2)
execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3)
is_recovering: bool = betterproto.bool_field(4)
+
+
+@dataclass(eq=False, repr=False)
+class Backpressure(betterproto.Message):
+ enable_backpressure: bool = betterproto.bool_field(1)
+
+
+@dataclass(eq=False, repr=False)
+class CreditUpdate(betterproto.Message):
+ pass
+
+
+@dataclass(eq=False, repr=False)
+class ActorCommand(betterproto.Message):
+ backpressure: "Backpressure" = betterproto.message_field(1,
group="sealed_value")
+ credit_update: "CreditUpdate" = betterproto.message_field(2,
group="sealed_value")
+
+
+@dataclass(eq=False, repr=False)
+class PythonActorMessage(betterproto.Message):
+ payload: "ActorCommand" = betterproto.message_field(1)