This is an automated email from the ASF dual-hosted git repository. mssun pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/incubator-teaclave.git
commit 9073e05f505220b1da535f4edcec5b7a071a36f5 Author: Mingshen Sun <[email protected]> AuthorDate: Fri Mar 13 16:49:24 2020 -0700 [services] Rewrite and coordinate scheduler and execution serivces --- binder/src/macros.rs | 6 +- cmake/scripts/test.sh | 2 +- services/execution/enclave/src/lib.rs | 70 +++++----- services/execution/enclave/src/service.rs | 144 +++++++++++++++------ services/management/enclave/src/task.rs | 2 + services/proto/build.rs | 1 - services/proto/src/lib.rs | 5 - .../src/proto/teaclave_execution_service.proto | 32 ----- .../src/proto/teaclave_scheduler_service.proto | 18 ++- services/proto/src/teaclave_execution_service.rs | 139 -------------------- services/proto/src/teaclave_scheduler_service.rs | 110 +++++++++++++--- services/scheduler/enclave/src/service.rs | 52 +++++++- tests/functional/enclave/src/lib.rs | 2 - .../enclave/src/teaclave_execution_service.rs | 75 ----------- .../enclave/src/teaclave_frontend_service.rs | 46 ++----- .../enclave/src/teaclave_scheduler_service.rs | 20 ++- tests/unit/enclave/src/lib.rs | 3 - types/src/task.rs | 2 + types/src/worker.rs | 5 + utils/service_enclave_utils/src/lib.rs | 20 +++ 20 files changed, 355 insertions(+), 399 deletions(-) diff --git a/binder/src/macros.rs b/binder/src/macros.rs index effb9ee..d81fa1e 100644 --- a/binder/src/macros.rs +++ b/binder/src/macros.rs @@ -94,13 +94,13 @@ macro_rules! register_ecall_handler { // The last argument could be either * mut usize, or &mut usize let input_buf: &[u8] = unsafe { std::slice::from_raw_parts(in_buf, in_len) }; - trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf); + log::trace!("tee receive cmd: {:x}, input_buf = {:?}", cmd, input_buf); let inner_vec = unsafe { match ecall_ipc_lib_dispatcher(cmd, input_buf) { Ok(out) => out, Err(e) => { - error!("tee execute cmd: {:x}, error: {}", cmd, e); + log::error!("tee execute cmd: {:x}, error: {}", cmd, e); return teaclave_types::EnclaveStatus(1); } } @@ -112,7 +112,7 @@ macro_rules! register_ecall_handler { *out_len = inner_len; if inner_len > out_max { - debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len); + log::debug!("tee before copy out_buf check: out_max={:x} < inner={:x}", out_max, inner_len); return teaclave_types::EnclaveStatus(0x0000_000c); } diff --git a/cmake/scripts/test.sh b/cmake/scripts/test.sh index 6993e42..1b320c1 100755 --- a/cmake/scripts/test.sh +++ b/cmake/scripts/test.sh @@ -73,7 +73,7 @@ run_functional_tests() { sleep 3 # wait for management service and scheduler_service ./teaclave_access_control_service & ./teaclave_frontend_service & - ./teaclave_execution_service & + # ./teaclave_execution_service & popd sleep 3 # wait for other services ./teaclave_functional_tests diff --git a/services/execution/enclave/src/lib.rs b/services/execution/enclave/src/lib.rs index 3584f92..4df60bc 100644 --- a/services/execution/enclave/src/lib.rs +++ b/services/execution/enclave/src/lib.rs @@ -22,55 +22,49 @@ extern crate sgx_tstd as std; #[cfg(feature = "mesalock_sgx")] use std::prelude::v1::*; -#[macro_use] -extern crate log; - -use teaclave_attestation::{AttestationConfig, RemoteAttestation}; +use teaclave_attestation::verifier; use teaclave_binder::proto::{ ECallCommand, FinalizeEnclaveInput, FinalizeEnclaveOutput, InitEnclaveInput, InitEnclaveOutput, StartServiceInput, StartServiceOutput, }; use teaclave_binder::{handle_ecall, register_ecall_handler}; use teaclave_config::RuntimeConfig; -use teaclave_proto::teaclave_execution_service::{ - TeaclaveExecutionRequest, TeaclaveExecutionResponse, -}; -use teaclave_rpc::config::SgxTrustedTlsServerConfig; -use teaclave_rpc::server::SgxTrustedTlsServer; +use teaclave_config::BUILD_CONFIG; +use teaclave_service_enclave_utils::create_trusted_scheduler_endpoint; use teaclave_service_enclave_utils::ServiceEnclave; -use teaclave_types::{TeeServiceError, TeeServiceResult}; +use teaclave_types::{EnclaveInfo, TeeServiceError, TeeServiceResult}; mod service; +const AS_ROOT_CA_CERT: &[u8] = BUILD_CONFIG.as_root_ca_cert; +const AUDITOR_PUBLIC_KEYS_LEN: usize = BUILD_CONFIG.auditor_public_keys.len(); +const AUDITOR_PUBLIC_KEYS: &[&[u8]; AUDITOR_PUBLIC_KEYS_LEN] = BUILD_CONFIG.auditor_public_keys; + fn start_service(config: &RuntimeConfig) -> anyhow::Result<()> { - let listen_address = config.internal_endpoints.execution.listen_address; - let as_config = &config.attestation; - let attestation_config = AttestationConfig::new( - &as_config.algorithm, - &as_config.url, - &as_config.key, - &as_config.spid, + let enclave_info = EnclaveInfo::verify_and_new( + config + .audit + .enclave_info_bytes + .as_ref() + .expect("enclave_info"), + AUDITOR_PUBLIC_KEYS, + config + .audit + .auditor_signatures_bytes + .as_ref() + .expect("auditor signatures"), + )?; + let scheduler_service_address = &config.internal_endpoints.scheduler.advertised_address; + let scheduler_service_endpoint = create_trusted_scheduler_endpoint( + &scheduler_service_address, + &enclave_info, + AS_ROOT_CA_CERT, + verifier::universal_quote_verifier, ); - let attested_tls_config = RemoteAttestation::new() - .config(attestation_config) - .generate_and_endorse() - .unwrap() - .attested_tls_config() - .unwrap(); - let server_config = - SgxTrustedTlsServerConfig::from_attested_tls_config(attested_tls_config).unwrap(); - let mut server = - SgxTrustedTlsServer::<TeaclaveExecutionResponse, TeaclaveExecutionRequest>::new( - listen_address, - server_config, - ); - match server.start(service::TeaclaveExecutionService::new()) { - Ok(_) => (), - Err(e) => { - error!("Service exit, error: {}.", e); - } - } + let mut service = service::TeaclaveExecutionService::new(scheduler_service_endpoint).unwrap(); + let _ = service.start(); + Ok(()) } @@ -101,8 +95,10 @@ register_ecall_handler!( #[cfg(feature = "enclave_unit_test")] pub mod tests { + use super::*; + use teaclave_test_utils::*; pub fn run_tests() -> bool { - true + run_tests!(service::tests::test_invoke_function) } } diff --git a/services/execution/enclave/src/service.rs b/services/execution/enclave/src/service.rs index ce451ef..f939c15 100644 --- a/services/execution/enclave/src/service.rs +++ b/services/execution/enclave/src/service.rs @@ -18,64 +18,128 @@ #[cfg(feature = "mesalock_sgx")] use std::prelude::v1::*; -use std::sync::Arc; +use std::sync::{Arc, SgxMutex as Mutex}; -use teaclave_proto::teaclave_execution_service::{ - StagedFunctionExecuteRequest, StagedFunctionExecuteResponse, TeaclaveExecution, -}; -use teaclave_service_enclave_utils::teaclave_service; -use teaclave_types::{TeaclaveServiceResponseError, TeaclaveServiceResponseResult}; - -use teaclave_rpc::Request; +use teaclave_proto::teaclave_scheduler_service::*; +use teaclave_rpc::endpoint::Endpoint; +use teaclave_types::{StagedTask, WorkerInvocationResult}; use teaclave_worker::Worker; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum TeaclaveExecutionError { - #[error("woker running spec error")] - WorkerRunningSpecError, -} -impl From<TeaclaveExecutionError> for TeaclaveServiceResponseError { - fn from(error: TeaclaveExecutionError) -> Self { - TeaclaveServiceResponseError::RequestError(error.to_string()) - } -} +use anyhow::Result; -#[teaclave_service(teaclave_execution_service, TeaclaveExecution, TeaclaveExecutionError)] #[derive(Clone)] pub(crate) struct TeaclaveExecutionService { worker: Arc<Worker>, + scheduler_client: Arc<Mutex<TeaclaveSchedulerClient>>, } impl TeaclaveExecutionService { - pub(crate) fn new() -> Self { - TeaclaveExecutionService { + pub(crate) fn new(scheduler_service_endpoint: Endpoint) -> Result<Self> { + let mut i = 0; + let channel = loop { + match scheduler_service_endpoint.connect() { + Ok(channel) => break channel, + Err(_) => { + anyhow::ensure!(i < 3, "failed to connect to storage service"); + log::debug!("Failed to connect to storage service, retry {}", i); + i += 1; + } + } + std::thread::sleep(std::time::Duration::from_secs(1)); + }; + let scheduler_client = Arc::new(Mutex::new(TeaclaveSchedulerClient::new(channel)?)); + Ok(TeaclaveExecutionService { worker: Arc::new(Worker::default()), - } + scheduler_client, + }) } -} -impl TeaclaveExecution for TeaclaveExecutionService { - fn invoke_function( - &self, - request: Request<StagedFunctionExecuteRequest>, - ) -> TeaclaveServiceResponseResult<StagedFunctionExecuteResponse> { - let request = request.message; - match self.worker.invoke_function(request.into()) { - Ok(summary) => { - info!("[+] Invoking function ok: {}", summary); - Ok(summary.into()) - } - Err(e) => { - error!("[+] Invoking function failed: {}", e); - Err(TeaclaveExecutionError::WorkerRunningSpecError.into()) - } + pub(crate) fn start(&mut self) -> Result<()> { + loop { + std::thread::sleep(std::time::Duration::from_secs(3)); + let scheduler_client = self.scheduler_client.clone(); + let mut client = match scheduler_client.lock() { + Ok(client) => client, + Err(e) => { + log::error!("Error: {:?}", e); + continue; + } + }; + + let request = PullTaskRequest {}; + log::debug!("pull_task"); + let response = match client.pull_task(request) { + Ok(response) => response, + Err(e) => { + log::error!("Error: {:?}", e); + continue; + } + }; + log::debug!("response: {:?}", response); + let result = self.invoke_task(response.staged_task); + self.update_task(result); } } + + fn invoke_task(&mut self, _task: StagedTask) -> WorkerInvocationResult { + // TODO: convert task to function, i.e., needs help from agent + unimplemented!() + } + + fn update_task(&mut self, _result: WorkerInvocationResult) { + unimplemented!() + } } #[cfg(test_mode)] mod test_mode { use super::*; } + +#[cfg(feature = "enclave_unit_test")] +pub mod tests { + use super::*; + use std::convert::TryInto; + use teaclave_types::*; + + pub fn test_invoke_function() { + let function_args = TeaclaveFunctionArguments::new(&hashmap!( + "feature_size" => "4", + "max_depth" => "4", + "iterations" => "100", + "shrinkage" => "0.1", + "feature_sample_ratio" => "1.0", + "data_sample_ratio" => "1.0", + "min_leaf_size" => "1", + "loss" => "LAD", + "training_optimization_level" => "2" + )); + + let plain_input = "fixtures/functions/gbdt_training/train.txt"; + let enc_output = "fixtures/functions/gbdt_training/model.enc.out"; + + let input_info = + TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap(); + let input_files = TeaclaveWorkerFileRegistry::new(hashmap!( + "training_data".to_string() => input_info)); + + let output_info = + TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default()); + let output_files = TeaclaveWorkerFileRegistry::new(hashmap!( + "trained_model".to_string() => output_info)); + let invocation = WorkerInvocation { + runtime_name: "default".to_string(), + executor_type: "native".try_into().unwrap(), + function_name: "gbdt_training".to_string(), + function_payload: String::new(), + function_args, + input_files, + output_files, + }; + + let worker = Worker::default(); + let result = worker.invoke_function(invocation); + assert!(result.is_ok()); + log::debug!("summary: {:?}", result.unwrap()); + } +} diff --git a/services/management/enclave/src/task.rs b/services/management/enclave/src/task.rs index d6e3729..2d3a584 100644 --- a/services/management/enclave/src/task.rs +++ b/services/management/enclave/src/task.rs @@ -59,6 +59,8 @@ pub(crate) fn create_task( approved_user_list: HashSet::new(), input_map: HashMap::new(), output_map: HashMap::new(), + return_value: None, + output_file_hash: HashMap::new(), status: TaskStatus::Created, }; // check arguments diff --git a/services/proto/build.rs b/services/proto/build.rs index b1f4897..fc17b99 100644 --- a/services/proto/build.rs +++ b/services/proto/build.rs @@ -26,7 +26,6 @@ fn main() { "src/proto/teaclave_authentication_service.proto", "src/proto/teaclave_common.proto", "src/proto/teaclave_storage_service.proto", - "src/proto/teaclave_execution_service.proto", "src/proto/teaclave_frontend_service.proto", "src/proto/teaclave_management_service.proto", "src/proto/teaclave_scheduler_service.proto", diff --git a/services/proto/src/lib.rs b/services/proto/src/lib.rs index b7a3af2..18c285f 100644 --- a/services/proto/src/lib.rs +++ b/services/proto/src/lib.rs @@ -22,7 +22,6 @@ extern crate sgx_tstd as std; pub mod teaclave_access_control_service; pub mod teaclave_authentication_service; pub mod teaclave_common; -pub mod teaclave_execution_service; pub mod teaclave_frontend_service; pub mod teaclave_management_service; pub mod teaclave_scheduler_service; @@ -46,10 +45,6 @@ pub mod teaclave_storage_service_proto { include_proto!("teaclave_storage_service_proto"); } -pub mod teaclave_execution_service_proto { - include_proto!("teaclave_execution_service_proto"); -} - pub mod teaclave_frontend_service_proto { include_proto!("teaclave_frontend_service_proto"); } diff --git a/services/proto/src/proto/teaclave_execution_service.proto b/services/proto/src/proto/teaclave_execution_service.proto deleted file mode 100644 index f0a57ba..0000000 --- a/services/proto/src/proto/teaclave_execution_service.proto +++ /dev/null @@ -1,32 +0,0 @@ -syntax = "proto3"; -package teaclave_execution_service_proto; - -import "teaclave_common.proto"; - -message WorkerInputFileInfo { - string path = 1; - teaclave_common_proto.FileCryptoInfo crypto_info = 2; -} - -message WorkerOutputFileInfo { - string path = 1; - teaclave_common_proto.FileCryptoInfo crypto_info = 2; -} - -message StagedFunctionExecuteRequest { - string runtime_name = 1; - string executor_type = 2; - string function_name = 3; - string function_payload = 4; - map<string, string> function_args = 11; - map<string, WorkerInputFileInfo> input_files = 21; - map<string, WorkerOutputFileInfo> output_files = 22; -} - -message StagedFunctionExecuteResponse { - string summary = 1; -} - -service TeaclaveExecution { - rpc InvokeFunction(StagedFunctionExecuteRequest) returns (StagedFunctionExecuteResponse); -} \ No newline at end of file diff --git a/services/proto/src/proto/teaclave_scheduler_service.proto b/services/proto/src/proto/teaclave_scheduler_service.proto index ea34ec0..151efe7 100644 --- a/services/proto/src/proto/teaclave_scheduler_service.proto +++ b/services/proto/src/proto/teaclave_scheduler_service.proto @@ -13,10 +13,18 @@ message PullTaskResponse { bytes staged_task = 1; } -message UpdateTaskRequest { - string staged_task_id = 2; +message UpdateTaskStatusRequest { + string task_id = 1; + teaclave_common_proto.TaskStatus task_status = 2; } -message UpdateTaskResponse {} +message UpdateTaskStatusResponse {} + +message UpdateTaskResultRequest { + string task_id = 1; + bytes return_value = 2; + map<string, string> output_file_hash = 3; +} +message UpdateTaskResultResponse {} message PublishTaskRequest { bytes staged_task = 1; @@ -30,5 +38,7 @@ service TeaclaveScheduler { // Subscriber rpc Subscribe(SubscribeRequest) returns (SubscribeResponse); rpc PullTask(PullTaskRequest) returns (PullTaskResponse); - rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse); + + rpc UpdateTaskStatus(UpdateTaskStatusRequest) returns (UpdateTaskStatusResponse); + rpc UpdateTaskResult(UpdateTaskResultRequest) returns (UpdateTaskResultResponse); } diff --git a/services/proto/src/teaclave_execution_service.rs b/services/proto/src/teaclave_execution_service.rs deleted file mode 100644 index 4713c1b..0000000 --- a/services/proto/src/teaclave_execution_service.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::prelude::v1::*; - -use anyhow::{anyhow, Error, Result}; -use core::convert::TryInto; -use teaclave_rpc::into_request; -use teaclave_types::{ - TeaclaveFunctionArguments, TeaclaveWorkerInputFileInfo, TeaclaveWorkerOutputFileInfo, - WorkerInvocation, -}; - -use crate::teaclave_execution_service_proto as proto; -pub use proto::TeaclaveExecution; -pub use proto::TeaclaveExecutionClient; -pub use proto::TeaclaveExecutionRequest; -pub use proto::TeaclaveExecutionResponse; - -#[into_request(TeaclaveExecutionRequest::InvokeFunction)] -#[derive(Debug)] -pub struct StagedFunctionExecuteRequest { - pub invocation: WorkerInvocation, -} - -#[into_request(TeaclaveExecutionResponse::InvokeFunction)] -#[derive(Debug)] -pub struct StagedFunctionExecuteResponse { - pub summary: std::string::String, -} - -impl std::convert::TryFrom<proto::WorkerInputFileInfo> for TeaclaveWorkerInputFileInfo { - type Error = Error; - fn try_from(proto: proto::WorkerInputFileInfo) -> Result<Self> { - let path = std::path::Path::new(&proto.path).to_path_buf(); - let crypto_info = proto - .crypto_info - .ok_or_else(|| anyhow!("Missing field: crypto_info"))? - .try_into()?; - Ok(TeaclaveWorkerInputFileInfo { path, crypto_info }) - } -} - -impl std::convert::TryFrom<proto::WorkerOutputFileInfo> for TeaclaveWorkerOutputFileInfo { - type Error = Error; - fn try_from(proto: proto::WorkerOutputFileInfo) -> Result<Self> { - let path = std::path::Path::new(&proto.path).to_path_buf(); - let crypto_info = proto - .crypto_info - .ok_or_else(|| anyhow!("Missing field: crypto_info"))? - .try_into()?; - Ok(TeaclaveWorkerOutputFileInfo { path, crypto_info }) - } -} - -// For server side -impl std::convert::TryFrom<proto::StagedFunctionExecuteRequest> for StagedFunctionExecuteRequest { - type Error = Error; - - fn try_from(proto: proto::StagedFunctionExecuteRequest) -> Result<Self> { - let ret = Self { - invocation: WorkerInvocation { - runtime_name: proto.runtime_name, - executor_type: proto.executor_type.as_str().try_into()?, - function_name: proto.function_name, - function_payload: proto.function_payload, - function_args: TeaclaveFunctionArguments { - args: proto.function_args, - }, - input_files: proto.input_files.try_into()?, - output_files: proto.output_files.try_into()?, - }, - }; - - Ok(ret) - } -} - -impl std::convert::TryFrom<proto::StagedFunctionExecuteResponse> for StagedFunctionExecuteResponse { - type Error = Error; - - fn try_from(proto: proto::StagedFunctionExecuteResponse) -> Result<Self> { - let ret = Self { - summary: proto.summary, - }; - - Ok(ret) - } -} - -// For client side -impl std::convert::From<TeaclaveWorkerInputFileInfo> for proto::WorkerInputFileInfo { - fn from(info: TeaclaveWorkerInputFileInfo) -> Self { - proto::WorkerInputFileInfo { - path: info.path.to_string_lossy().to_string(), - crypto_info: Some(info.crypto_info.into()), - } - } -} - -impl std::convert::From<TeaclaveWorkerOutputFileInfo> for proto::WorkerOutputFileInfo { - fn from(info: TeaclaveWorkerOutputFileInfo) -> Self { - proto::WorkerOutputFileInfo { - path: info.path.to_string_lossy().to_string(), - crypto_info: Some(info.crypto_info.into()), - } - } -} - -impl From<StagedFunctionExecuteRequest> for WorkerInvocation { - fn from(request: StagedFunctionExecuteRequest) -> Self { - request.invocation - } -} - -impl From<StagedFunctionExecuteRequest> for proto::StagedFunctionExecuteRequest { - fn from(request: StagedFunctionExecuteRequest) -> Self { - Self { - runtime_name: request.invocation.runtime_name, - executor_type: request.invocation.executor_type.to_string(), - function_name: request.invocation.function_name, - function_payload: request.invocation.function_payload, - function_args: request.invocation.function_args.args, - input_files: request.invocation.input_files.into(), - output_files: request.invocation.output_files.into(), - } - } -} - -impl From<StagedFunctionExecuteResponse> for proto::StagedFunctionExecuteResponse { - fn from(response: StagedFunctionExecuteResponse) -> Self { - Self { - summary: response.summary, - } - } -} - -impl From<String> for StagedFunctionExecuteResponse { - fn from(summary: String) -> Self { - Self { summary } - } -} diff --git a/services/proto/src/teaclave_scheduler_service.rs b/services/proto/src/teaclave_scheduler_service.rs index 1d8870a..2677367 100644 --- a/services/proto/src/teaclave_scheduler_service.rs +++ b/services/proto/src/teaclave_scheduler_service.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::prelude::v1::*; -use crate::teaclave_execution_service::StagedFunctionExecuteRequest; +use crate::teaclave_common::{i32_from_task_status, i32_to_task_status}; use crate::teaclave_scheduler_service_proto as proto; use anyhow::{Error, Result}; use core::convert::TryInto; @@ -13,7 +13,7 @@ pub use proto::TeaclaveSchedulerClient; pub use proto::TeaclaveSchedulerRequest; pub use proto::TeaclaveSchedulerResponse; use teaclave_rpc::into_request; -use teaclave_types::StagedTask; +use teaclave_types::{StagedTask, TaskStatus}; #[into_request(TeaclaveSchedulerRequest::Subscribe)] pub struct SubscribeRequest {} @@ -38,13 +38,46 @@ impl PullTaskResponse { } } -#[into_request(TeaclaveSchedulerRequest::UpdateTask)] -pub struct UpdateTaskRequest { - pub staged_task_id: String, +#[into_request(TeaclaveSchedulerRequest::UpdateTaskResult)] +pub struct UpdateTaskResultRequest { + pub task_id: String, + pub return_value: Vec<u8>, + pub output_file_hash: HashMap<String, String>, } -#[into_request(TeaclaveSchedulerResponse::UpdateTask)] -pub struct UpdateTaskResponse {} +impl UpdateTaskResultRequest { + pub fn new( + task_id: impl Into<String>, + return_value: &[u8], + output_file_hash: HashMap<String, String>, + ) -> Self { + Self { + task_id: task_id.into(), + return_value: return_value.to_vec(), + output_file_hash, + } + } +} + +#[into_request(TeaclaveSchedulerResponse::UpdateTaskResult)] +pub struct UpdateTaskResultResponse {} + +#[into_request(TeaclaveSchedulerRequest::UpdateTaskStatus)] +pub struct UpdateTaskStatusRequest { + pub task_id: String, + pub task_status: TaskStatus, +} + +impl UpdateTaskStatusRequest { + pub fn new(task_id: impl Into<String>, task_status: TaskStatus) -> Self { + Self { + task_id: task_id.into(), + task_status, + } + } +} +#[into_request(TeaclaveSchedulerResponse::UpdateTaskStatus)] +pub struct UpdateTaskStatusResponse {} #[into_request(TeaclaveSchedulerRequest::PublishTask)] pub struct PublishTaskRequest { @@ -116,36 +149,75 @@ impl std::convert::From<PullTaskResponse> for proto::PullTaskResponse { } } } +impl std::convert::TryFrom<proto::UpdateTaskResultRequest> for UpdateTaskResultRequest { + type Error = Error; + fn try_from(proto: proto::UpdateTaskResultRequest) -> Result<Self> { + let ret = Self { + task_id: proto.task_id, + return_value: proto.return_value, + output_file_hash: proto.output_file_hash, + }; + Ok(ret) + } +} + +impl std::convert::From<UpdateTaskResultRequest> for proto::UpdateTaskResultRequest { + fn from(req: UpdateTaskResultRequest) -> Self { + proto::UpdateTaskResultRequest { + task_id: req.task_id, + return_value: req.return_value, + output_file_hash: req.output_file_hash, + } + } +} + +impl std::convert::TryFrom<proto::UpdateTaskResultResponse> for UpdateTaskResultResponse { + type Error = Error; + fn try_from(proto: proto::UpdateTaskResultResponse) -> Result<Self> { + let ret = Self {}; + Ok(ret) + } +} + +impl std::convert::From<UpdateTaskResultResponse> for proto::UpdateTaskResultResponse { + fn from(req: UpdateTaskResultResponse) -> Self { + proto::UpdateTaskResultResponse {} + } +} -impl std::convert::TryFrom<proto::UpdateTaskRequest> for UpdateTaskRequest { +impl std::convert::TryFrom<proto::UpdateTaskStatusRequest> for UpdateTaskStatusRequest { type Error = Error; - fn try_from(proto: proto::UpdateTaskRequest) -> Result<Self> { + fn try_from(proto: proto::UpdateTaskStatusRequest) -> Result<Self> { + let task_status = i32_to_task_status(proto.task_status)?; let ret = Self { - staged_task_id: proto.staged_task_id, + task_id: proto.task_id, + task_status, }; Ok(ret) } } -impl std::convert::From<UpdateTaskRequest> for proto::UpdateTaskRequest { - fn from(req: UpdateTaskRequest) -> Self { - proto::UpdateTaskRequest { - staged_task_id: req.staged_task_id, +impl std::convert::From<UpdateTaskStatusRequest> for proto::UpdateTaskStatusRequest { + fn from(req: UpdateTaskStatusRequest) -> Self { + let task_status = i32_from_task_status(req.task_status); + proto::UpdateTaskStatusRequest { + task_id: req.task_id, + task_status, } } } -impl std::convert::TryFrom<proto::UpdateTaskResponse> for UpdateTaskResponse { +impl std::convert::TryFrom<proto::UpdateTaskStatusResponse> for UpdateTaskStatusResponse { type Error = Error; - fn try_from(proto: proto::UpdateTaskResponse) -> Result<Self> { + fn try_from(proto: proto::UpdateTaskStatusResponse) -> Result<Self> { let ret = Self {}; Ok(ret) } } -impl std::convert::From<UpdateTaskResponse> for proto::UpdateTaskResponse { - fn from(req: UpdateTaskResponse) -> Self { - proto::UpdateTaskResponse {} +impl std::convert::From<UpdateTaskStatusResponse> for proto::UpdateTaskStatusResponse { + fn from(req: UpdateTaskStatusResponse) -> Self { + proto::UpdateTaskStatusResponse {} } } diff --git a/services/scheduler/enclave/src/service.rs b/services/scheduler/enclave/src/service.rs index 76c0b2c..5a843d7 100644 --- a/services/scheduler/enclave/src/service.rs +++ b/services/scheduler/enclave/src/service.rs @@ -29,7 +29,7 @@ use teaclave_rpc::endpoint::Endpoint; use teaclave_rpc::Request; use teaclave_service_enclave_utils::teaclave_service; use teaclave_types::{ - StagedTask, Storable, TeaclaveServiceResponseError, TeaclaveServiceResponseResult, + StagedTask, Storable, Task, TeaclaveServiceResponseError, TeaclaveServiceResponseResult, }; use anyhow::anyhow; @@ -94,6 +94,31 @@ impl TeaclaveSchedulerService { T::from_slice(dequeue_response.value.as_slice()) .map_err(|_| TeaclaveSchedulerError::DataError.into()) } + + fn get_task(&self, key: &str) -> Result<Task> { + let key = format!("{}-{}", Task::key_prefix(), key); + let get_request = GetRequest::new(key.into_bytes()); + let get_response = self + .storage_client + .clone() + .lock() + .map_err(|_| anyhow!("Cannot lock storage client"))? + .get(get_request)?; + Task::from_slice(get_response.value.as_slice()) + } + + fn put_task(&self, item: &impl Storable) -> Result<()> { + let k = item.key(); + let v = item.to_vec()?; + let put_request = PutRequest::new(k.as_slice(), v.as_slice()); + let _put_response = self + .storage_client + .clone() + .lock() + .map_err(|_| anyhow!("Cannot lock storage client"))? + .put(put_request)?; + Ok(()) + } } impl TeaclaveScheduler for TeaclaveSchedulerService { @@ -117,6 +142,7 @@ impl TeaclaveScheduler for TeaclaveSchedulerService { &self, request: Request<SubscribeRequest>, ) -> TeaclaveServiceResponseResult<SubscribeResponse> { + // TODO: subscribe a specific topic unimplemented!() } @@ -130,11 +156,27 @@ impl TeaclaveScheduler for TeaclaveSchedulerService { Ok(response) } - fn update_task( + fn update_task_status( &self, - request: Request<UpdateTaskRequest>, - ) -> TeaclaveServiceResponseResult<UpdateTaskResponse> { - unimplemented!() + request: Request<UpdateTaskStatusRequest>, + ) -> TeaclaveServiceResponseResult<UpdateTaskStatusResponse> { + let request = request.message; + let mut task = self.get_task(&request.task_id)?; + task.status = request.task_status; + self.put_task(&task)?; + Ok(UpdateTaskStatusResponse {}) + } + + fn update_task_result( + &self, + request: Request<UpdateTaskResultRequest>, + ) -> TeaclaveServiceResponseResult<UpdateTaskResultResponse> { + let request = request.message; + let mut task = self.get_task(&request.task_id)?; + task.return_value = Some(request.return_value); + task.output_file_hash = request.output_file_hash; + self.put_task(&task)?; + Ok(UpdateTaskResultResponse {}) } } diff --git a/tests/functional/enclave/src/lib.rs b/tests/functional/enclave/src/lib.rs index b768b3e..891c552 100644 --- a/tests/functional/enclave/src/lib.rs +++ b/tests/functional/enclave/src/lib.rs @@ -37,7 +37,6 @@ use teaclave_types::TeeServiceResult; mod teaclave_access_control_service; mod teaclave_authentication_service; -mod teaclave_execution_service; mod teaclave_frontend_service; mod teaclave_management_service; mod teaclave_scheduler_service; @@ -49,7 +48,6 @@ fn handle_run_test(_: &RunTestInput) -> TeeServiceResult<RunTestOutput> { teaclave_access_control_service::run_tests(), teaclave_authentication_service::run_tests(), teaclave_storage_service::run_tests(), - teaclave_execution_service::run_tests(), teaclave_frontend_service::run_tests(), teaclave_management_service::run_tests(), teaclave_scheduler_service::run_tests(), diff --git a/tests/functional/enclave/src/teaclave_execution_service.rs b/tests/functional/enclave/src/teaclave_execution_service.rs deleted file mode 100644 index e86019a..0000000 --- a/tests/functional/enclave/src/teaclave_execution_service.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::prelude::v1::*; - -use std::convert::TryInto; -use teaclave_proto::teaclave_execution_service::*; -use teaclave_rpc::endpoint::Endpoint; - -use teaclave_config::RuntimeConfig; -use teaclave_types::hashmap; -use teaclave_types::TeaclaveFileRootKey128; -use teaclave_types::TeaclaveFunctionArguments; -use teaclave_types::TeaclaveWorkerFileRegistry; -use teaclave_types::TeaclaveWorkerInputFileInfo; -use teaclave_types::TeaclaveWorkerOutputFileInfo; -use teaclave_types::WorkerInvocation; - -pub fn run_tests() -> bool { - use teaclave_test_utils::*; - run_tests!(test_invoke_success,) -} - -fn get_client() -> TeaclaveExecutionClient { - let runtime_config = RuntimeConfig::from_toml("runtime.config.toml").expect("runtime"); - let channel = Endpoint::new( - &runtime_config - .internal_endpoints - .execution - .advertised_address, - ) - .connect() - .expect("channel"); - TeaclaveExecutionClient::new(channel).expect("client") -} - -fn test_invoke_success() { - let mut client = get_client(); - - let function_args = TeaclaveFunctionArguments::new(&hashmap!( - "feature_size" => "4", - "max_depth" => "4", - "iterations" => "100", - "shrinkage" => "0.1", - "feature_sample_ratio" => "1.0", - "data_sample_ratio" => "1.0", - "min_leaf_size" => "1", - "loss" => "LAD", - "training_optimization_level" => "2" - )); - - let plain_input = "fixtures/functions/gbdt_training/train.txt"; - let enc_output = "fixtures/functions/gbdt_training/model.enc.out"; - - let input_info = TeaclaveWorkerInputFileInfo::create_with_plaintext_file(plain_input).unwrap(); - let input_files = TeaclaveWorkerFileRegistry::new(hashmap!( - "training_data".to_string() => input_info)); - - let output_info = - TeaclaveWorkerOutputFileInfo::new(enc_output, TeaclaveFileRootKey128::default()); - let output_files = TeaclaveWorkerFileRegistry::new(hashmap!( - "trained_model".to_string() => output_info)); - - let request = StagedFunctionExecuteRequest { - invocation: WorkerInvocation { - runtime_name: "default".to_string(), - executor_type: "native".try_into().unwrap(), - function_name: "gbdt_training".to_string(), - function_payload: String::new(), - function_args, - input_files, - output_files, - }, - }; - - let response_result = client.invoke_function(request); - assert!(response_result.is_ok()); -} diff --git a/tests/functional/enclave/src/teaclave_frontend_service.rs b/tests/functional/enclave/src/teaclave_frontend_service.rs index a67e75d..b670133 100644 --- a/tests/functional/enclave/src/teaclave_frontend_service.rs +++ b/tests/functional/enclave/src/teaclave_frontend_service.rs @@ -180,14 +180,12 @@ fn test_get_output_file() { let response = client.register_output_file(request); let data_id = response.unwrap().data_id; - let request = GetOutputFileRequest { - data_id: data_id.clone(), - }; + let request = GetOutputFileRequest::new(&data_id); let response = client.get_output_file(request); assert!(response.is_ok()); assert!(response.unwrap().hash.is_empty()); - let request = GetOutputFileRequest { data_id }; + let request = GetOutputFileRequest::new(&data_id); client .metadata_mut() .insert("token".to_string(), "wrong token".to_string()); @@ -206,14 +204,12 @@ fn test_get_input_file() { let response = client.register_input_file(request); let data_id = response.unwrap().data_id; - let request = GetInputFileRequest { - data_id: data_id.clone(), - }; + let request = GetInputFileRequest::new(&data_id); let response = client.get_input_file(request); assert!(response.is_ok()); assert!(!response.unwrap().hash.is_empty()); - let request = GetInputFileRequest { data_id }; + let request = GetInputFileRequest::new(&data_id); client .metadata_mut() .insert("token".to_string(), "wrong token".to_string()); @@ -256,16 +252,12 @@ fn test_register_function() { fn test_get_function() { let mut client = get_client(); - let request = GetFunctionRequest { - function_id: "function-00000000-0000-0000-0000-000000000001".to_string(), - }; + let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001"); let response = client.get_function(request); assert!(response.is_ok()); assert!(!response.unwrap().name.is_empty()); - let request = GetFunctionRequest { - function_id: "function-00000000-0000-0000-0000-000000000001".to_string(), - }; + let request = GetFunctionRequest::new("function-00000000-0000-0000-0000-000000000001"); client .metadata_mut() .insert("token".to_string(), "wrong token".to_string()); @@ -332,14 +324,12 @@ fn test_get_task() { let response = client.create_task(request); let task_id = response.unwrap().task_id; - let request = GetTaskRequest { - task_id: task_id.clone(), - }; + let request = GetTaskRequest::new(&task_id); let response = client.get_task(request); assert!(response.is_ok()); assert!(!response.unwrap().function_id.is_empty()); - let request = GetTaskRequest { task_id }; + let request = GetTaskRequest::new(task_id); client .metadata_mut() .insert("token".to_string(), "wrong token".to_string()); @@ -436,9 +426,7 @@ fn test_approve_task() { }; let _response = client.assign_data(request); - let request = ApproveTaskRequest { - task_id: task_id.clone(), - }; + let request = ApproveTaskRequest::new(&task_id); let correct_token = client.metadata().get("token").unwrap().to_string(); client .metadata_mut() @@ -446,7 +434,7 @@ fn test_approve_task() { let response = client.approve_task(request); assert!(response.is_err()); - let request = ApproveTaskRequest { task_id }; + let request = ApproveTaskRequest::new(&task_id); client .metadata_mut() .insert("token".to_string(), correct_token); @@ -489,14 +477,10 @@ fn test_invoke_task() { }; let _response = client.assign_data(request); - let request = ApproveTaskRequest { - task_id: task_id.clone(), - }; + let request = ApproveTaskRequest::new(&task_id); let _response = client.approve_task(request); - let request = InvokeTaskRequest { - task_id: task_id.clone(), - }; + let request = InvokeTaskRequest::new(&task_id); let correct_token = client.metadata().get("token").unwrap().to_string(); client .metadata_mut() @@ -504,16 +488,14 @@ fn test_invoke_task() { let response = client.invoke_task(request); assert!(response.is_err()); - let request = InvokeTaskRequest { - task_id: task_id.clone(), - }; + let request = InvokeTaskRequest::new(&task_id); client .metadata_mut() .insert("token".to_string(), correct_token); let response = client.invoke_task(request); assert!(response.is_ok()); - let request = GetTaskRequest { task_id }; + let request = GetTaskRequest::new(&task_id); let response = client.get_task(request); assert_eq!(response.unwrap().status, TaskStatus::Running); } diff --git a/tests/functional/enclave/src/teaclave_scheduler_service.rs b/tests/functional/enclave/src/teaclave_scheduler_service.rs index a4f0a22..5ef46d1 100644 --- a/tests/functional/enclave/src/teaclave_scheduler_service.rs +++ b/tests/functional/enclave/src/teaclave_scheduler_service.rs @@ -11,7 +11,7 @@ use teaclave_types::*; pub fn run_tests() -> bool { use teaclave_test_utils::*; - run_tests!(test_pull_task,) + run_tests!(test_pull_task, test_update_task_status_result) } fn get_client(user_id: &str) -> TeaclaveSchedulerClient { @@ -51,3 +51,21 @@ fn test_pull_task() { log::debug!("response: {:?}", response); assert!(response.is_ok()); } + +fn test_update_task_status_result() { + let mut client = get_client("mock_user"); + let request = PullTaskRequest {}; + let response = client.pull_task(request).unwrap(); + log::debug!("response: {:?}", response); + let task_id = response.staged_task.task_id.to_string(); + + let request = UpdateTaskStatusRequest::new(&task_id, TaskStatus::Finished); + let response = client.update_task_status(request); + assert!(response.is_ok()); + + let request = + UpdateTaskResultRequest::new(&task_id, "return".to_string().as_bytes(), HashMap::new()); + let response = client.update_task_result(request); + + assert!(response.is_ok()); +} diff --git a/tests/unit/enclave/src/lib.rs b/tests/unit/enclave/src/lib.rs index 7a12fa7..c498810 100644 --- a/tests/unit/enclave/src/lib.rs +++ b/tests/unit/enclave/src/lib.rs @@ -19,9 +19,6 @@ #[cfg(feature = "mesalock_sgx")] extern crate sgx_tstd as std; -#[macro_use] -extern crate log; - use std::prelude::v1::*; use teaclave_access_control_service_enclave; diff --git a/types/src/task.rs b/types/src/task.rs index 013cf71..b83af0c 100644 --- a/types/src/task.rs +++ b/types/src/task.rs @@ -34,6 +34,8 @@ pub struct Task { pub approved_user_list: HashSet<String>, pub input_map: HashMap<String, String>, pub output_map: HashMap<String, String>, + pub return_value: Option<Vec<u8>>, + pub output_file_hash: HashMap<String, String>, pub status: TaskStatus, } diff --git a/types/src/worker.rs b/types/src/worker.rs index dc60c02..2dea81d 100644 --- a/types/src/worker.rs +++ b/types/src/worker.rs @@ -267,6 +267,11 @@ pub struct WorkerInvocation { pub output_files: TeaclaveWorkerFileRegistry<TeaclaveWorkerOutputFileInfo>, } +pub struct WorkerInvocationResult { + pub return_value: Vec<u8>, + pub output_file_hash: HashMap<String, String>, +} + #[cfg(feature = "enclave_unit_test")] pub mod tests { use super::*; diff --git a/utils/service_enclave_utils/src/lib.rs b/utils/service_enclave_utils/src/lib.rs index da3ad2b..eb2bcd9 100644 --- a/utils/service_enclave_utils/src/lib.rs +++ b/utils/service_enclave_utils/src/lib.rs @@ -70,3 +70,23 @@ pub fn create_trusted_storage_endpoint( Endpoint::new(storage_service_address).config(storage_service_client_config) } + +pub fn create_trusted_scheduler_endpoint( + advertised_address: &str, + enclave_info: &EnclaveInfo, + as_root_ca_cert: &[u8], + verifier: AttestationReportVerificationFn, +) -> Endpoint { + let scheduler_service_enclave_attrs = enclave_info + .get_enclave_attr("teaclave_scheduler_service") + .expect("enclave_info"); + let scheduler_service_client_config = SgxTrustedTlsClientConfig::new() + .attestation_report_verifier( + vec![scheduler_service_enclave_attrs], + as_root_ca_cert, + verifier, + ); + let scheduler_service_address = &advertised_address; + + Endpoint::new(scheduler_service_address).config(scheduler_service_client_config) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
