zealchen commented on code in PR #1539: URL: https://github.com/apache/horaedb/pull/1539#discussion_r1709910209
########## src/components/fb_util/src/common.rs: ########## @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Read; + +use bytes::{Buf, BufMut}; +use flatbuffers; +use tonic::{ + codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}, + Status, +}; + +pub struct FlatBufferBytes(Vec<u8>); + +impl FlatBufferBytes { + pub fn new(data: Vec<u8>) -> Self { + Self(data) + } + + pub fn serialize<'buf, T: flatbuffers::Follow<'buf> + 'buf>( + mut builder: flatbuffers::FlatBufferBuilder<'buf>, + root_offset: flatbuffers::WIPOffset<T>, + ) -> Self { + builder.finish(root_offset, None); + let (mut data, head) = builder.collapse(); + Self(data.drain(head..).collect()) + } + + pub fn deserialize<'buf, T: flatbuffers::Follow<'buf> + flatbuffers::Verifiable + 'buf>( + &'buf self, + ) -> Result<T::Inner, Box<dyn std::error::Error>> { + flatbuffers::root::<T>(self.0.as_slice()) + .map_err(|x| Box::new(x) as Box<dyn std::error::Error>) + } +} + +#[derive(Debug)] +pub struct FlatBufferEncoder(); + +impl Encoder for FlatBufferEncoder { + type Error = Status; + type Item = FlatBufferBytes; + + fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + buf.put_slice(item.0.as_slice()); + Ok(()) + } +} + +#[derive(Debug)] +pub struct FlatBufferDecoder(); + +impl Decoder for FlatBufferDecoder { + type Error = Status; + type Item = FlatBufferBytes; + + fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> { + if !buf.has_remaining() { + return Ok(None); + } + let mut data: Vec<u8> = Vec::new(); + buf.reader() + .read_to_end(&mut data) + .map_err(|e| Status::internal(e.to_string()))?; + let item = FlatBufferBytes::new(data); + Ok(Some(item)) + } +} + +/// A [`Codec`] that implements `application/grpc+json` via the serde library. Review Comment: Okay. ########## src/server/src/grpc/remote_engine_service/mod.rs: ########## @@ -571,17 +604,36 @@ impl RemoteEngineServiceImpl { async fn write_internal( &self, - request: Request<WriteRequest>, - ) -> std::result::Result<Response<WriteResponse>, Status> { + request: TonicWriteRequestExt, + ) -> std::result::Result<TonicWriteResponseExt, Status> { let begin_instant = Instant::now(); let ctx = self.handler_ctx(); - let handle = self.runtimes.write_runtime.spawn(async move { - let request = request.into_inner(); - handle_write(ctx, request).await.map_err(|e| { - error!("Handle write failed, err:{e}"); - e - }) - }); + let (handle, is_flatbuffer) = match request { Review Comment: Okay. ########## src/server/src/grpc/mod.rs: ########## @@ -165,6 +166,7 @@ pub struct RpcServices { rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>, AuthWithFile>, meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>, remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>, + remote_engine_server_ext: RemoteEngineFbServiceServer<RemoteEngineServiceImpl>, Review Comment: Okay. ########## src/server/src/grpc/remote_engine_service/mod.rs: ########## @@ -920,6 +1012,35 @@ struct HandlerContext { hotspot_recorder: Arc<HotspotRecorder>, } +#[async_trait] +impl RemoteEngineFbService for RemoteEngineServiceImpl { + async fn write( + &self, + request: tonic::Request<FlatBufferBytes>, + ) -> std::result::Result<tonic::Response<FlatBufferBytes>, tonic::Status> { + let result = self + .write_internal(TonicWriteRequestExt::Flatbuffer(request)) + .await?; + match result { + TonicWriteResponseExt::Flatbuffer(v) => Ok(v), + TonicWriteResponseExt::Proto(_) => Err(Status::new(Code::Internal, "logic error")), Review Comment: Okay. ########## src/remote_engine_client/src/client.rs: ########## @@ -140,33 +144,35 @@ impl Client { // Write to remote. let table_ident = request.table.clone(); let endpoint = route_context.endpoint.clone(); - let request_pb = request.convert_into_pb().box_err().context(Convert { - msg: "Failed to convert WriteRequest to pb", + let request_fb = request.convert_into_fb().box_err().context(Convert { + msg: "Failed to convert WriteRequest to fb", })?; - let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(route_context.channel); + + let mut rpc_client = RemoteEngineFbServiceClient::<Channel>::new(route_context.channel); let result = rpc_client - .write(Request::new(request_pb)) + .write(Request::new(request_fb)) .await .with_context(|| Rpc { table_idents: vec![table_ident.clone()], msg: "Failed to write to remote engine", }); let result = result.and_then(|response| { - let response = response.into_inner(); - if let Some(header) = &response.header - && !status_code::is_ok(header.code) + let fb_response = response.into_inner(); + let response = fb_response.deserialize::<WriteResponse>().unwrap(); + if let Some(header) = &response.header() + && !status_code::is_ok(header.code()) { Server { endpoint, table_idents: vec![table_ident.clone()], - code: header.code, - msg: header.error.clone(), + code: header.code(), + msg: header.error().unwrap().to_string(), Review Comment: Change to unwrap_or() and also update server side to response None if error message if empty. ########## src/components/fb_util/src/common.rs: ########## Review Comment: Okay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
