This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 690e494672 feat(ovfs): export VirtioFs struct (#4983)
690e494672 is described below
commit 690e49467228602cccabe78ba8403fd9123146d4
Author: zjregee <[email protected]>
AuthorDate: Fri Aug 9 00:13:48 2024 +0800
feat(ovfs): export VirtioFs struct (#4983)
* feat: export VirtioFs struct
* feat: polish code
---
integrations/virtiofs/Cargo.toml | 2 +
integrations/virtiofs/src/filesystem.rs | 12 +-
integrations/virtiofs/src/lib.rs | 2 +
integrations/virtiofs/src/virtiofs.rs | 199 +++++++++++++++++++++--------
integrations/virtiofs/src/virtiofs_util.rs | 2 -
5 files changed, 156 insertions(+), 61 deletions(-)
diff --git a/integrations/virtiofs/Cargo.toml b/integrations/virtiofs/Cargo.toml
index b7de43038b..d3eca2361b 100644
--- a/integrations/virtiofs/Cargo.toml
+++ b/integrations/virtiofs/Cargo.toml
@@ -30,6 +30,8 @@ version = "0.0.0"
[dependencies]
anyhow = { version = "1.0.86", features = ["std"] }
libc = "0.2.139"
+log = "0.4.22"
+opendal = { version = "0.48.0", path = "../../core" }
snafu = "0.8.3"
vhost = "0.11.0"
vhost-user-backend = "0.14.0"
diff --git a/integrations/virtiofs/src/filesystem.rs
b/integrations/virtiofs/src/filesystem.rs
index ae3f1b8550..cdaedea6d0 100644
--- a/integrations/virtiofs/src/filesystem.rs
+++ b/integrations/virtiofs/src/filesystem.rs
@@ -18,6 +18,7 @@
use std::io::Write;
use std::mem::size_of;
+use opendal::Operator;
use vm_memory::ByteValued;
use crate::error::*;
@@ -37,12 +38,15 @@ const MAX_BUFFER_SIZE: u32 = 1 << 20;
/// Filesystem is a filesystem implementation with opendal backend,
/// and will decode and process messages from VMs.
-pub struct Filesystem {}
+pub struct Filesystem {
+ // FIXME: #[allow(dead_code)] here should be removed in the future.
+ #[allow(dead_code)]
+ core: Operator,
+}
-#[allow(dead_code)]
impl Filesystem {
- pub fn new() -> Filesystem {
- Filesystem {}
+ pub fn new(core: Operator) -> Filesystem {
+ Filesystem { core }
}
pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
diff --git a/integrations/virtiofs/src/lib.rs b/integrations/virtiofs/src/lib.rs
index a3de2697b2..4abea18c87 100644
--- a/integrations/virtiofs/src/lib.rs
+++ b/integrations/virtiofs/src/lib.rs
@@ -20,3 +20,5 @@ mod filesystem;
mod filesystem_message;
mod virtiofs;
mod virtiofs_util;
+
+pub use virtiofs::VirtioFs;
diff --git a/integrations/virtiofs/src/virtiofs.rs
b/integrations/virtiofs/src/virtiofs.rs
index 56f0fbd396..fbacaf1f67 100644
--- a/integrations/virtiofs/src/virtiofs.rs
+++ b/integrations/virtiofs/src/virtiofs.rs
@@ -16,33 +16,41 @@
// under the License.
use std::io;
+use std::sync::Arc;
use std::sync::RwLock;
+use log::warn;
+use opendal::Operator;
use vhost::vhost_user::message::VhostUserProtocolFeatures;
use vhost::vhost_user::message::VhostUserVirtioFeatures;
use vhost::vhost_user::Backend;
+use vhost::vhost_user::Listener;
use vhost_user_backend::VhostUserBackend;
+use vhost_user_backend::VhostUserDaemon;
use vhost_user_backend::VringMutex;
use vhost_user_backend::VringState;
use vhost_user_backend::VringT;
use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
-use vm_memory::ByteValued;
+use virtio_queue::DescriptorChain;
+use virtio_queue::QueueOwnedT;
+use vm_memory::GuestAddressSpace;
use vm_memory::GuestMemoryAtomic;
+use vm_memory::GuestMemoryLoadGuard;
use vm_memory::GuestMemoryMmap;
-use vm_memory::Le32;
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;
use crate::error::*;
+use crate::filesystem::Filesystem;
+use crate::virtiofs_util::Reader;
+use crate::virtiofs_util::Writer;
/// Marks an event from the high priority queue.
const HIPRIO_QUEUE_EVENT: u16 = 0;
/// Marks an event from the request queue.
const REQ_QUEUE_EVENT: u16 = 1;
-/// The maximum number of bytes in VirtioFsConfig tag field.
-const MAX_TAG_LEN: usize = 36;
/// The maximum queue size supported.
const QUEUE_SIZE: usize = 32768;
/// The number of request queues supported.
@@ -53,6 +61,7 @@ const NUM_QUEUES: usize = REQUEST_QUEUES + 1;
/// VhostUserFsThread represents the actual worker process used to handle file
system requests from VMs.
struct VhostUserFsThread {
+ core: Filesystem,
mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
vu_req: Option<Backend>,
event_idx: bool,
@@ -60,11 +69,12 @@ struct VhostUserFsThread {
}
impl VhostUserFsThread {
- fn new() -> Result<VhostUserFsThread> {
+ fn new(core: Filesystem) -> Result<VhostUserFsThread> {
let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
new_unexpected_error("failed to create kill eventfd",
Some(err.into()))
})?;
Ok(VhostUserFsThread {
+ core,
mem: None,
vu_req: None,
event_idx: false,
@@ -72,6 +82,35 @@ impl VhostUserFsThread {
})
}
+ /// This is used when the backend has processed a request and needs to
notify the frontend.
+ fn return_descriptor(
+ vring_state: &mut VringState,
+ head_index: u16,
+ event_idx: bool,
+ len: usize,
+ ) {
+ if vring_state.add_used(head_index, len as u32).is_err() {
+ warn!("Failed to add used to used queue.");
+ };
+ // Check if the used queue needs to be signaled.
+ if event_idx {
+ match vring_state.needs_notification() {
+ Ok(needs_notification) => {
+ if needs_notification &&
vring_state.signal_used_queue().is_err() {
+ warn!("Failed to signal used queue.");
+ }
+ }
+ Err(_) => {
+ if vring_state.signal_used_queue().is_err() {
+ warn!("Failed to signal used queue.");
+ };
+ }
+ }
+ } else if vring_state.signal_used_queue().is_err() {
+ warn!("Failed to signal used queue.");
+ }
+ }
+
/// Process filesystem requests one at a time in a serialized manner.
fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) ->
Result<()> {
let mut vring_state = match device_event {
@@ -83,10 +122,16 @@ impl VhostUserFsThread {
// If EVENT_IDX is enabled, we could keep calling process_queue()
// until it stops finding new request on the queue.
loop {
- vring_state.disable_notification().unwrap();
+ if vring_state.disable_notification().is_err() {
+ warn!("Failed to disable used queue notification.");
+ }
self.process_queue_serial(&mut vring_state)?;
- if !vring_state.enable_notification().unwrap() {
- break;
+ if let Ok(has_more) = vring_state.enable_notification() {
+ if !has_more {
+ break;
+ }
+ } else {
+ warn!("Failed to enable used queue notification.");
}
}
} else {
@@ -98,37 +143,61 @@ impl VhostUserFsThread {
/// Forwards filesystem messages to specific functions and
/// returns the filesystem request execution result.
- fn process_queue_serial(&self, _vring_state: &mut VringState) ->
Result<bool> {
- unimplemented!()
+ fn process_queue_serial(&self, vring_state: &mut VringState) ->
Result<bool> {
+ let mut used_any = false;
+ let mem = match &self.mem {
+ Some(m) => m.memory(),
+ None => return Err(new_unexpected_error("no memory configured",
None)),
+ };
+ let avail_chains:
Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
+ .get_queue_mut()
+ .iter(mem.clone())
+ .map_err(|_| new_unexpected_error("iterating through the queue
failed", None))?
+ .collect();
+ for chain in avail_chains {
+ used_any = true;
+ let head_index = chain.head_index();
+ let reader = Reader::new(&mem, chain.clone())
+ .map_err(|_| new_unexpected_error("creating a queue reader
failed", None))
+ .unwrap();
+ let writer = Writer::new(&mem, chain.clone())
+ .map_err(|_| new_unexpected_error("creating a queue writer
failed", None))
+ .unwrap();
+ let len = self
+ .core
+ .handle_message(reader, writer)
+ .map_err(|_| new_unexpected_error("processing a queue request
failed", None))
+ .unwrap();
+ VhostUserFsThread::return_descriptor(vring_state, head_index,
self.event_idx, len);
+ }
+ Ok(used_any)
}
}
/// VhostUserFsBackend is a structure that implements the VhostUserBackend
trait
/// and implements concrete services for the vhost user backend server.
-pub struct VhostUserFsBackend {
- tag: Option<String>,
+struct VhostUserFsBackend {
thread: RwLock<VhostUserFsThread>,
}
-#[allow(dead_code)]
impl VhostUserFsBackend {
- pub fn new(tag: Option<String>) -> Result<VhostUserFsBackend> {
- let thread = RwLock::new(VhostUserFsThread::new()?);
- Ok(VhostUserFsBackend { thread, tag })
+ fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
+ let thread = RwLock::new(VhostUserFsThread::new(core)?);
+ Ok(VhostUserFsBackend { thread })
}
-}
-/// VirtioFsConfig will be serialized and used as
-/// the return value of get_config function in the VhostUserBackend trait.
-#[repr(C)]
-#[derive(Clone, Copy)]
-struct VirtioFsConfig {
- tag: [u8; MAX_TAG_LEN],
- num_request_queues: Le32,
+ fn kill(&self) -> Result<()> {
+ self.thread
+ .read()
+ .unwrap()
+ .kill_event_fd
+ .write(1)
+ .map_err(|err| {
+ new_unexpected_error("failed to write to kill eventfd",
Some(err.into()))
+ })
+ }
}
-unsafe impl ByteValued for VirtioFsConfig {}
-
impl VhostUserBackend for VhostUserFsBackend {
type Bitmap = ();
type Vring = VringMutex;
@@ -155,15 +224,11 @@ impl VhostUserBackend for VhostUserFsBackend {
/// Get available vhost protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures {
// Align to the virtiofsd's protocol features here.
- let mut protocol_features = VhostUserProtocolFeatures::MQ
+ VhostUserProtocolFeatures::MQ
| VhostUserProtocolFeatures::BACKEND_REQ
| VhostUserProtocolFeatures::BACKEND_SEND_FD
| VhostUserProtocolFeatures::REPLY_ACK
- | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS;
- if self.tag.is_some() {
- protocol_features |= VhostUserProtocolFeatures::CONFIG;
- }
- protocol_features
+ | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
}
/// Enable or disabled the virtio EVENT_IDX feature.
@@ -171,29 +236,6 @@ impl VhostUserBackend for VhostUserFsBackend {
self.thread.write().unwrap().event_idx = enabled;
}
- /// Get virtio device configuration.
- fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
- let tag = self
- .tag
- .as_ref()
- .expect("did not expect read of config if tag is not set.");
- let mut fixed_len_tag = [0; MAX_TAG_LEN];
- fixed_len_tag[0..tag.len()].copy_from_slice(tag.as_bytes());
- let config = VirtioFsConfig {
- tag: fixed_len_tag,
- num_request_queues: Le32::from(REQUEST_QUEUES as u32),
- };
- let mut result: Vec<_> = config
- .as_slice()
- .iter()
- .skip(offset as usize)
- .take(size as usize)
- .copied()
- .collect();
- result.resize(size as usize, 0);
- result
- }
-
/// Update guest memory regions.
fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) ->
io::Result<()> {
self.thread.write().unwrap().mem = Some(mem);
@@ -238,3 +280,50 @@ impl VhostUserBackend for VhostUserFsBackend {
.map_err(|err| err.into())
}
}
+
+/// VirtioFS is a structure that represents the virtiofs service.
+/// It is used to run the virtiofs service with the given operator and socket
path.
+/// The operator is used to interact with the backend storage system.
+/// The socket path is used to communicate with the QEMU and VMs.
+pub struct VirtioFs {
+ socket_path: String,
+ filesystem_backend: Arc<VhostUserFsBackend>,
+}
+
+impl VirtioFs {
+ pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
+ let filesystem_core = Filesystem::new(core);
+ let filesystem_backend =
Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
+ Ok(VirtioFs {
+ socket_path: socket_path.to_string(),
+ filesystem_backend,
+ })
+ }
+
+ // Run the virtiofs service.
+ pub fn run(&self) -> Result<()> {
+ let listener = Listener::new(&self.socket_path, true)
+ .map_err(|_| new_unexpected_error("failed to create listener",
None))?;
+ let mut daemon = VhostUserDaemon::new(
+ String::from("virtiofs-backend"),
+ self.filesystem_backend.clone(),
+ GuestMemoryAtomic::new(GuestMemoryMmap::new()),
+ )
+ .unwrap();
+ if daemon.start(listener).is_err() {
+ return Err(new_unexpected_error("failed to start daemon", None));
+ }
+ if daemon.wait().is_err() {
+ return Err(new_unexpected_error("failed to wait daemon", None));
+ }
+ Ok(())
+ }
+
+ // Kill the virtiofs service.
+ pub fn kill(&self) -> Result<()> {
+ if self.filesystem_backend.kill().is_err() {
+ return Err(new_unexpected_error("failed to kill backend", None));
+ }
+ Ok(())
+ }
+}
diff --git a/integrations/virtiofs/src/virtiofs_util.rs
b/integrations/virtiofs/src/virtiofs_util.rs
index f797c8dacb..bcbdf6ea91 100644
--- a/integrations/virtiofs/src/virtiofs_util.rs
+++ b/integrations/virtiofs/src/virtiofs_util.rs
@@ -96,7 +96,6 @@ pub struct Reader<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}
-#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,
@@ -174,7 +173,6 @@ pub struct Writer<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}
-#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,