This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 690e494672 feat(ovfs): export VirtioFs struct (#4983)
690e494672 is described below

commit 690e49467228602cccabe78ba8403fd9123146d4
Author: zjregee <[email protected]>
AuthorDate: Fri Aug 9 00:13:48 2024 +0800

    feat(ovfs): export VirtioFs struct (#4983)
    
    * feat: export VirtioFs struct
    
    * feat: polish code
---
 integrations/virtiofs/Cargo.toml           |   2 +
 integrations/virtiofs/src/filesystem.rs    |  12 +-
 integrations/virtiofs/src/lib.rs           |   2 +
 integrations/virtiofs/src/virtiofs.rs      | 199 +++++++++++++++++++++--------
 integrations/virtiofs/src/virtiofs_util.rs |   2 -
 5 files changed, 156 insertions(+), 61 deletions(-)

diff --git a/integrations/virtiofs/Cargo.toml b/integrations/virtiofs/Cargo.toml
index b7de43038b..d3eca2361b 100644
--- a/integrations/virtiofs/Cargo.toml
+++ b/integrations/virtiofs/Cargo.toml
@@ -30,6 +30,8 @@ version = "0.0.0"
 [dependencies]
 anyhow = { version = "1.0.86", features = ["std"] }
 libc = "0.2.139"
+log = "0.4.22"
+opendal = { version = "0.48.0", path = "../../core" }
 snafu = "0.8.3"
 vhost = "0.11.0"
 vhost-user-backend = "0.14.0"
diff --git a/integrations/virtiofs/src/filesystem.rs 
b/integrations/virtiofs/src/filesystem.rs
index ae3f1b8550..cdaedea6d0 100644
--- a/integrations/virtiofs/src/filesystem.rs
+++ b/integrations/virtiofs/src/filesystem.rs
@@ -18,6 +18,7 @@
 use std::io::Write;
 use std::mem::size_of;
 
+use opendal::Operator;
 use vm_memory::ByteValued;
 
 use crate::error::*;
@@ -37,12 +38,15 @@ const MAX_BUFFER_SIZE: u32 = 1 << 20;
 
 /// Filesystem is a filesystem implementation with opendal backend,
 /// and will decode and process messages from VMs.
-pub struct Filesystem {}
+pub struct Filesystem {
+    // FIXME: #[allow(dead_code)] here should be removed in the future.
+    #[allow(dead_code)]
+    core: Operator,
+}
 
-#[allow(dead_code)]
 impl Filesystem {
-    pub fn new() -> Filesystem {
-        Filesystem {}
+    pub fn new(core: Operator) -> Filesystem {
+        Filesystem { core }
     }
 
     pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
diff --git a/integrations/virtiofs/src/lib.rs b/integrations/virtiofs/src/lib.rs
index a3de2697b2..4abea18c87 100644
--- a/integrations/virtiofs/src/lib.rs
+++ b/integrations/virtiofs/src/lib.rs
@@ -20,3 +20,5 @@ mod filesystem;
 mod filesystem_message;
 mod virtiofs;
 mod virtiofs_util;
+
+pub use virtiofs::VirtioFs;
diff --git a/integrations/virtiofs/src/virtiofs.rs 
b/integrations/virtiofs/src/virtiofs.rs
index 56f0fbd396..fbacaf1f67 100644
--- a/integrations/virtiofs/src/virtiofs.rs
+++ b/integrations/virtiofs/src/virtiofs.rs
@@ -16,33 +16,41 @@
 // under the License.
 
 use std::io;
+use std::sync::Arc;
 use std::sync::RwLock;
 
+use log::warn;
+use opendal::Operator;
 use vhost::vhost_user::message::VhostUserProtocolFeatures;
 use vhost::vhost_user::message::VhostUserVirtioFeatures;
 use vhost::vhost_user::Backend;
+use vhost::vhost_user::Listener;
 use vhost_user_backend::VhostUserBackend;
+use vhost_user_backend::VhostUserDaemon;
 use vhost_user_backend::VringMutex;
 use vhost_user_backend::VringState;
 use vhost_user_backend::VringT;
 use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
-use vm_memory::ByteValued;
+use virtio_queue::DescriptorChain;
+use virtio_queue::QueueOwnedT;
+use vm_memory::GuestAddressSpace;
 use vm_memory::GuestMemoryAtomic;
+use vm_memory::GuestMemoryLoadGuard;
 use vm_memory::GuestMemoryMmap;
-use vm_memory::Le32;
 use vmm_sys_util::epoll::EventSet;
 use vmm_sys_util::eventfd::EventFd;
 
 use crate::error::*;
+use crate::filesystem::Filesystem;
+use crate::virtiofs_util::Reader;
+use crate::virtiofs_util::Writer;
 
 /// Marks an event from the high priority queue.
 const HIPRIO_QUEUE_EVENT: u16 = 0;
 /// Marks an event from the request queue.
 const REQ_QUEUE_EVENT: u16 = 1;
-/// The maximum number of bytes in VirtioFsConfig tag field.
-const MAX_TAG_LEN: usize = 36;
 /// The maximum queue size supported.
 const QUEUE_SIZE: usize = 32768;
 /// The number of request queues supported.
@@ -53,6 +61,7 @@ const NUM_QUEUES: usize = REQUEST_QUEUES + 1;
 
 /// VhostUserFsThread represents the actual worker process used to handle file 
system requests from VMs.
 struct VhostUserFsThread {
+    core: Filesystem,
     mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
     vu_req: Option<Backend>,
     event_idx: bool,
@@ -60,11 +69,12 @@ struct VhostUserFsThread {
 }
 
 impl VhostUserFsThread {
-    fn new() -> Result<VhostUserFsThread> {
+    fn new(core: Filesystem) -> Result<VhostUserFsThread> {
         let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
             new_unexpected_error("failed to create kill eventfd", 
Some(err.into()))
         })?;
         Ok(VhostUserFsThread {
+            core,
             mem: None,
             vu_req: None,
             event_idx: false,
@@ -72,6 +82,35 @@ impl VhostUserFsThread {
         })
     }
 
+    /// This is used when the backend has processed a request and needs to 
notify the frontend.
+    fn return_descriptor(
+        vring_state: &mut VringState,
+        head_index: u16,
+        event_idx: bool,
+        len: usize,
+    ) {
+        if vring_state.add_used(head_index, len as u32).is_err() {
+            warn!("Failed to add used to used queue.");
+        };
+        // Check if the used queue needs to be signaled.
+        if event_idx {
+            match vring_state.needs_notification() {
+                Ok(needs_notification) => {
+                    if needs_notification && 
vring_state.signal_used_queue().is_err() {
+                        warn!("Failed to signal used queue.");
+                    }
+                }
+                Err(_) => {
+                    if vring_state.signal_used_queue().is_err() {
+                        warn!("Failed to signal used queue.");
+                    };
+                }
+            }
+        } else if vring_state.signal_used_queue().is_err() {
+            warn!("Failed to signal used queue.");
+        }
+    }
+
     /// Process filesystem requests one at a time in a serialized manner.
     fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> 
Result<()> {
         let mut vring_state = match device_event {
@@ -83,10 +122,16 @@ impl VhostUserFsThread {
             // If EVENT_IDX is enabled, we could keep calling process_queue()
             // until it stops finding new request on the queue.
             loop {
-                vring_state.disable_notification().unwrap();
+                if vring_state.disable_notification().is_err() {
+                    warn!("Failed to disable used queue notification.");
+                }
                 self.process_queue_serial(&mut vring_state)?;
-                if !vring_state.enable_notification().unwrap() {
-                    break;
+                if let Ok(has_more) = vring_state.enable_notification() {
+                    if !has_more {
+                        break;
+                    }
+                } else {
+                    warn!("Failed to enable used queue notification.");
                 }
             }
         } else {
@@ -98,37 +143,61 @@ impl VhostUserFsThread {
 
     /// Forwards filesystem messages to specific functions and
     /// returns the filesystem request execution result.
-    fn process_queue_serial(&self, _vring_state: &mut VringState) -> 
Result<bool> {
-        unimplemented!()
+    fn process_queue_serial(&self, vring_state: &mut VringState) -> 
Result<bool> {
+        let mut used_any = false;
+        let mem = match &self.mem {
+            Some(m) => m.memory(),
+            None => return Err(new_unexpected_error("no memory configured", 
None)),
+        };
+        let avail_chains: 
Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
+            .get_queue_mut()
+            .iter(mem.clone())
+            .map_err(|_| new_unexpected_error("iterating through the queue 
failed", None))?
+            .collect();
+        for chain in avail_chains {
+            used_any = true;
+            let head_index = chain.head_index();
+            let reader = Reader::new(&mem, chain.clone())
+                .map_err(|_| new_unexpected_error("creating a queue reader 
failed", None))
+                .unwrap();
+            let writer = Writer::new(&mem, chain.clone())
+                .map_err(|_| new_unexpected_error("creating a queue writer 
failed", None))
+                .unwrap();
+            let len = self
+                .core
+                .handle_message(reader, writer)
+                .map_err(|_| new_unexpected_error("processing a queue request 
failed", None))
+                .unwrap();
+            VhostUserFsThread::return_descriptor(vring_state, head_index, 
self.event_idx, len);
+        }
+        Ok(used_any)
     }
 }
 
 /// VhostUserFsBackend is a structure that implements the VhostUserBackend 
trait
 /// and implements concrete services for the vhost user backend server.
-pub struct VhostUserFsBackend {
-    tag: Option<String>,
+struct VhostUserFsBackend {
     thread: RwLock<VhostUserFsThread>,
 }
 
-#[allow(dead_code)]
 impl VhostUserFsBackend {
-    pub fn new(tag: Option<String>) -> Result<VhostUserFsBackend> {
-        let thread = RwLock::new(VhostUserFsThread::new()?);
-        Ok(VhostUserFsBackend { thread, tag })
+    fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
+        let thread = RwLock::new(VhostUserFsThread::new(core)?);
+        Ok(VhostUserFsBackend { thread })
     }
-}
 
-/// VirtioFsConfig will be serialized and used as
-/// the return value of get_config function in the VhostUserBackend trait.
-#[repr(C)]
-#[derive(Clone, Copy)]
-struct VirtioFsConfig {
-    tag: [u8; MAX_TAG_LEN],
-    num_request_queues: Le32,
+    fn kill(&self) -> Result<()> {
+        self.thread
+            .read()
+            .unwrap()
+            .kill_event_fd
+            .write(1)
+            .map_err(|err| {
+                new_unexpected_error("failed to write to kill eventfd", 
Some(err.into()))
+            })
+    }
 }
 
-unsafe impl ByteValued for VirtioFsConfig {}
-
 impl VhostUserBackend for VhostUserFsBackend {
     type Bitmap = ();
     type Vring = VringMutex;
@@ -155,15 +224,11 @@ impl VhostUserBackend for VhostUserFsBackend {
     /// Get available vhost protocol features.
     fn protocol_features(&self) -> VhostUserProtocolFeatures {
         // Align to the virtiofsd's protocol features here.
-        let mut protocol_features = VhostUserProtocolFeatures::MQ
+        VhostUserProtocolFeatures::MQ
             | VhostUserProtocolFeatures::BACKEND_REQ
             | VhostUserProtocolFeatures::BACKEND_SEND_FD
             | VhostUserProtocolFeatures::REPLY_ACK
-            | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS;
-        if self.tag.is_some() {
-            protocol_features |= VhostUserProtocolFeatures::CONFIG;
-        }
-        protocol_features
+            | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
     }
 
     /// Enable or disabled the virtio EVENT_IDX feature.
@@ -171,29 +236,6 @@ impl VhostUserBackend for VhostUserFsBackend {
         self.thread.write().unwrap().event_idx = enabled;
     }
 
-    /// Get virtio device configuration.
-    fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
-        let tag = self
-            .tag
-            .as_ref()
-            .expect("did not expect read of config if tag is not set.");
-        let mut fixed_len_tag = [0; MAX_TAG_LEN];
-        fixed_len_tag[0..tag.len()].copy_from_slice(tag.as_bytes());
-        let config = VirtioFsConfig {
-            tag: fixed_len_tag,
-            num_request_queues: Le32::from(REQUEST_QUEUES as u32),
-        };
-        let mut result: Vec<_> = config
-            .as_slice()
-            .iter()
-            .skip(offset as usize)
-            .take(size as usize)
-            .copied()
-            .collect();
-        result.resize(size as usize, 0);
-        result
-    }
-
     /// Update guest memory regions.
     fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) -> 
io::Result<()> {
         self.thread.write().unwrap().mem = Some(mem);
@@ -238,3 +280,50 @@ impl VhostUserBackend for VhostUserFsBackend {
             .map_err(|err| err.into())
     }
 }
+
+/// VirtioFS is a structure that represents the virtiofs service.
+/// It is used to run the virtiofs service with the given operator and socket 
path.
+/// The operator is used to interact with the backend storage system.
+/// The socket path is used to communicate with the QEMU and VMs.
+pub struct VirtioFs {
+    socket_path: String,
+    filesystem_backend: Arc<VhostUserFsBackend>,
+}
+
+impl VirtioFs {
+    pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
+        let filesystem_core = Filesystem::new(core);
+        let filesystem_backend = 
Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
+        Ok(VirtioFs {
+            socket_path: socket_path.to_string(),
+            filesystem_backend,
+        })
+    }
+
+    // Run the virtiofs service.
+    pub fn run(&self) -> Result<()> {
+        let listener = Listener::new(&self.socket_path, true)
+            .map_err(|_| new_unexpected_error("failed to create listener", 
None))?;
+        let mut daemon = VhostUserDaemon::new(
+            String::from("virtiofs-backend"),
+            self.filesystem_backend.clone(),
+            GuestMemoryAtomic::new(GuestMemoryMmap::new()),
+        )
+        .unwrap();
+        if daemon.start(listener).is_err() {
+            return Err(new_unexpected_error("failed to start daemon", None));
+        }
+        if daemon.wait().is_err() {
+            return Err(new_unexpected_error("failed to wait daemon", None));
+        }
+        Ok(())
+    }
+
+    // Kill the virtiofs service.
+    pub fn kill(&self) -> Result<()> {
+        if self.filesystem_backend.kill().is_err() {
+            return Err(new_unexpected_error("failed to kill backend", None));
+        }
+        Ok(())
+    }
+}
diff --git a/integrations/virtiofs/src/virtiofs_util.rs 
b/integrations/virtiofs/src/virtiofs_util.rs
index f797c8dacb..bcbdf6ea91 100644
--- a/integrations/virtiofs/src/virtiofs_util.rs
+++ b/integrations/virtiofs/src/virtiofs_util.rs
@@ -96,7 +96,6 @@ pub struct Reader<'a, B = ()> {
     buffer: DescriptorChainConsumer<'a, B>,
 }
 
-#[allow(dead_code)]
 impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> {
     pub fn new<M>(
         mem: &'a GuestMemoryMmap<B>,
@@ -174,7 +173,6 @@ pub struct Writer<'a, B = ()> {
     buffer: DescriptorChainConsumer<'a, B>,
 }
 
-#[allow(dead_code)]
 impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> {
     pub fn new<M>(
         mem: &'a GuestMemoryMmap<B>,

Reply via email to