Author: lstewart
Date: Thu Jul  8 03:28:25 2010
New Revision: 209788
URL: http://svn.freebsd.org/changeset/base/209788

Log:
  MFC r207223:
  
  - Rework the underlying ALQ storage to be a circular buffer, which amongst 
other
    things allows variable length messages to be easily supported.
  
  - Extend KPI with alq_writen() and alq_getn() to support variable length
    messages, which is enabled at ALQ creation time depending on the arguments
    passed to alq_open(). Also add variants of alq_open() and alq_post() that
    accept a flags argument. The KPI is still fully backwards compatible and
    shouldn't require any change in ALQ consumers unless they wish to utilise
    the new features.
  
  - Introduce the ALQ_NOACTIVATE and ALQ_ORDERED flags to allow ALQ consumers
    to have more control over IO scheduling and resource acquisition
    respectively.
  
  - Strengthen invariants checking.
  
  - Document ALQ changes in ALQ(9) man page.
  
  Sponsored by: FreeBSD Foundation
  Reviewed by:  gnn, jeff, rpaulo, rwatson

Modified:
  stable/8/share/man/man9/alq.9
  stable/8/sys/kern/kern_alq.c
  stable/8/sys/sys/alq.h
Directory Properties:
  stable/8/share/man/   (props changed)
  stable/8/share/man/man1/   (props changed)
  stable/8/share/man/man3/   (props changed)
  stable/8/share/man/man4/   (props changed)
  stable/8/share/man/man5/   (props changed)
  stable/8/share/man/man7/   (props changed)
  stable/8/share/man/man8/   (props changed)
  stable/8/share/man/man9/   (props changed)
  stable/8/sys/   (props changed)
  stable/8/sys/amd64/include/xen/   (props changed)
  stable/8/sys/cddl/contrib/opensolaris/   (props changed)
  stable/8/sys/contrib/dev/acpica/   (props changed)
  stable/8/sys/contrib/pf/   (props changed)
  stable/8/sys/dev/xen/xenpci/   (props changed)

Modified: stable/8/share/man/man9/alq.9
==============================================================================
--- stable/8/share/man/man9/alq.9       Thu Jul  8 03:28:25 2010        
(r209787)
+++ stable/8/share/man/man9/alq.9       Thu Jul  8 03:28:25 2010        
(r209788)
@@ -1,7 +1,13 @@
 .\"
 .\" Copyright (c) 2003 Hiten Pandya <h...@freebsd.org>
+.\" Copyright (c) 2009-2010 The FreeBSD Foundation
 .\" All rights reserved.
 .\"
+.\" Portions of this software were developed at the Centre for Advanced
+.\" Internet Architectures, Swinburne University of Technology, Melbourne,
+.\" Australia by Lawrence Stewart under sponsorship from the FreeBSD
+.\" Foundation.
+.\"
 .\" Redistribution and use in source and binary forms, with or without
 .\" modification, are permitted provided that the following conditions
 .\" are met:
@@ -25,21 +31,34 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd May 16, 2003
+.Dd April 26, 2010
 .Dt ALQ 9
 .Os
 .Sh NAME
 .Nm alq ,
+.Nm alq_open_flags ,
 .Nm alq_open ,
+.Nm alq_writen ,
 .Nm alq_write ,
 .Nm alq_flush ,
 .Nm alq_close ,
+.Nm alq_getn ,
 .Nm alq_get ,
+.Nm alq_post_flags ,
 .Nm alq_post
 .Nd Asynchronous Logging Queues
 .Sh SYNOPSIS
 .In sys/alq.h
 .Ft int
+.Fo alq_open_flags
+.Fa "struct alq **app"
+.Fa "const char *file"
+.Fa "struct ucred *cred"
+.Fa "int cmode"
+.Fa "int size"
+.Fa "int flags"
+.Fc
+.Ft int
 .Fo alq_open
 .Fa "struct alq **app"
 .Fa "const char *file"
@@ -49,19 +68,25 @@
 .Fa "int count"
 .Fc
 .Ft int
-.Fn alq_write "struct alq *alq" "void *data" "int waitok"
+.Fn alq_writen "struct alq *alq" "void *data" "int len" "int flags"
+.Ft int
+.Fn alq_write "struct alq *alq" "void *data" "int flags"
 .Ft void
 .Fn alq_flush "struct alq *alq"
 .Ft void
 .Fn alq_close "struct alq *alq"
 .Ft struct ale *
-.Fn alq_get "struct alq *alq" "int waitok"
+.Fn alq_getn "struct alq *alq" "int len" "int flags"
+.Ft struct ale *
+.Fn alq_get "struct alq *alq" "int flags"
+.Ft void
+.Fn alq_post_flags "struct alq *alq" "struct ale *ale" "int flags"
 .Ft void
 .Fn alq_post "struct alq *alq" "struct ale *ale"
 .Sh DESCRIPTION
 The
 .Nm
-facility provides an asynchronous fixed length recording
+facility provides an asynchronous fixed or variable length recording
 mechanism, known as Asynchronous Logging Queues.
 It can record to any
 .Xr vnode 9 ,
@@ -81,26 +106,37 @@ is defined as
 which has the following members:
 .Bd -literal -offset indent
 struct ale {
-       struct ale      *ae_next;       /* Next Entry */
-       char            *ae_data;       /* Entry buffer */
-       int             ae_flags;       /* Entry flags */
+       intptr_t        ae_bytesused;   /* # bytes written to ALE. */
+       char            *ae_data;       /* Write ptr. */
+       int             ae_pad;         /* Unused, compat. */
 };
 .Ed
 .Pp
-The
-.Va ae_flags
-field is for internal use, clients of the
+An
 .Nm
-interface should not modify this field.
-Behaviour is undefined if this field is modified.
+can be created in either fixed or variable length mode.
+A variable length
+.Nm
+accommodates writes of varying length using
+.Fn alq_writen
+and
+.Fn alq_getn .
+A fixed length
+.Nm
+accommodates a fixed number of writes using
+.Fn alq_write
+and
+.Fn alq_get ,
+each of fixed size (set at queue creation time).
+Fixed length mode is deprecated in favour of variable length mode.
 .Sh FUNCTIONS
 The
-.Fn alq_open
-function creates a new logging queue.
+.Fn alq_open_flags
+function creates a new variable length asynchronous logging queue.
 The
 .Fa file
-argument is the name of the file to open for logging; if the file does not
-yet exist,
+argument is the name of the file to open for logging.
+If the file does not yet exist,
 .Fn alq_open
 will attempt to create it.
 The
@@ -112,33 +148,99 @@ as the requested creation mode, to be us
 Consumers of this API may wish to pass
 .Dv ALQ_DEFAULT_CMODE ,
 a default creation mode suitable for most applications.
-The argument
+The
 .Fa cred
-specifies the credentials to use when opening and performing I/O on the file.
-The size of each entry in the queue is determined by
-.Fa size .
+argument specifies the credentials to use when opening and performing I/O on 
the file.
 The
+.Fa size
+argument sets the size (in bytes) of the underlying queue.
+The ALQ_ORDERED flag may be passed in via
+.Fa flags
+to indicate that the ordering of writer threads waiting for a busy
+.Nm
+to free up resources should be preserved.
+.Pp
+The deprecated
+.Fn alq_open
+function is implemented as a wrapper around
+.Fn alq_open_flags
+to provide backwards compatibility to consumers that have not been updated to
+utilise the newer
+.Fn alq_open_flags
+function.
+It passes all arguments through to
+.Fn alq_open_flags
+untouched except for
+.Fa size
+and
+.Fa count ,
+and sets
+.Fa flags
+to 0.
+To create a variable length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of the underlying queue and the
+.Fa count
+argument should be set to 0.
+To create a fixed length mode
+.Nm ,
+the
+.Fa size
+argument should be set to the size (in bytes) of each write and the
 .Fa count
-argument determines the number of items to be stored in the
-asynchronous queue over an approximate period of a disk
-write operation.
+argument should be set to the number of
+.Fa size
+byte chunks to reserve capacity for.
 .Pp
 The
-.Fn alq_write
+.Fn alq_writen
 function writes
+.Fa len
+bytes from
 .Fa data
-to the designated queue,
+to the designated variable length mode queue
 .Fa alq .
-In the event that
-.Fn alq_write
-could not write the entry immediately, and
+If
+.Fn alq_writen
+could not write the entry immediately and
 .Dv ALQ_WAITOK
-is passed to
-.Fa waitok ,
-then
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
+with the
+.Dq Li alqwnord
+or
+.Dq Li alqwnres
+wait message.
+A write will automatically schedule the queue
+.Fa alq
+to be flushed to disk.
+This behaviour can be controlled by passing ALQ_NOACTIVATE via
+.Fa flags
+to indicate that the write should not schedule
+.Fa alq
+to be flushed to disk.
+.Pp
+The deprecated
+.Fn alq_write
+function is implemented as a wrapper around
+.Fn alq_writen
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The function will write
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time) from the
+.Fa data
+buffer to the
+.Fa alq .
+Note that it is an error to call
 .Fn alq_write
-will be allowed to
-.Xr tsleep 9 .
+on a variable length mode queue.
 .Pp
 The
 .Fn alq_flush
@@ -146,61 +248,136 @@ function is used for flushing
 .Fa alq
 to the log medium that was passed to
 .Fn alq_open .
+If
+.Fa alq
+has data to flush and is not already in the process of being flushed, the
+function will block doing IO.
+Otherwise, the function will return immediately.
 .Pp
 The
 .Fn alq_close
-function will close the asynchronous logging queue,
-.Fa alq ,
+function will close the asynchronous logging queue
+.Fa alq
 and flush all pending write requests to the log medium.
 It will free all resources that were previously allocated.
 .Pp
 The
-.Fn alq_get
-function returns the next available asynchronous logging entry
-from the queue,
-.Fa alq .
-This function leaves the queue in a locked state, until a subsequent
+.Fn alq_getn
+function returns an asynchronous log entry from
+.Fa alq ,
+initialised to point at a buffer capable of receiving
+.Fa len
+bytes of data.
+This function leaves
+.Fa alq
+in a locked state, until a subsequent
 .Fn alq_post
+or
+.Fn alq_post_flags
 call is made.
-In the event that
-.Fn alq_get
-could not retrieve an entry immediately, it will
-.Xr tsleep 9
+If
+.Fn alq_getn
+could not obtain
+.Fa len
+bytes of buffer immediately and
+.Dv ALQ_WAITOK
+is set in
+.Fa flags ,
+the function will be allowed to
+.Xr msleep_spin 9
 with the
-.Dq Li alqget
+.Dq Li alqgnord
+or
+.Dq Li alqgnres
 wait message.
+The caller can choose to write less than
+.Fa len
+bytes of data to the returned asynchronous log entry by setting the entry's
+ae_bytesused field to the number of bytes actually written.
+This must be done prior to calling
+.Fn alq_post .
 .Pp
-The
-.Fn alq_post
-function schedules the asynchronous logging entry,
-.Fa ale ,
-which is retrieved using the
+The deprecated
 .Fn alq_get
-function,
-for writing to the asynchronous logging queue,
+function is implemented as a wrapper around
+.Fn alq_getn
+to provide backwards compatibility to consumers that have not been updated to
+utilise variable length mode queues.
+The asynchronous log entry returned will be initialised to point at a buffer
+capable of receiving
+.Fa size
+bytes of data (where
+.Fa size
+was specified at queue creation time).
+Note that it is an error to call
+.Fn alq_get
+on a variable length mode queue.
+.Pp
+The
+.Fn alq_post_flags
+function schedules the asynchronous log entry
+.Fa ale
+(obtained from
+.Fn alq_getn
+or
+.Fn alq_get )
+for writing to
 .Fa alq .
-This function leaves the queue,
-.Fa alq ,
+The ALQ_NOACTIVATE flag may be passed in via
+.Fa flags
+to indicate that the queue should not be immediately scheduled to be flushed to
+disk.
+This function leaves
+.Fa alq
 in an unlocked state.
+.Pp
+The
+.Fn alq_post
+function is implemented as a wrapper around
+.Fn alq_post_flags
+to provide backwards compatibility to consumers that have not been updated to
+utilise the newer
+.Fn alq_post_flags
+function.
+It simply passes all arguments through to
+.Fn alq_post_flags
+untouched, and sets
+.Fa flags
+to 0.
 .Sh IMPLEMENTATION NOTES
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function is a wrapper around the
+functions both perform a
+.Xr bcopy 3
+from the supplied
+.Fa data
+buffer into the underlying
+.Nm
+buffer.
+Performance critical code paths may wish to consider using
+.Fn alq_getn
+(variable length queues) or
+.Fn alq_get
+(fixed length queues) to avoid the extra memory copy. Note that a queue
+remains locked between calls to
+.Fn alq_getn
+or
 .Fn alq_get
 and
 .Fn alq_post
-functions; by using these functions separately, a call
-to
-.Fn bcopy
-can be avoided for performance critical code paths.
+or
+.Fn alq_post_flags ,
+so this method of writing to a queue is unsuitable for situations where the
+time between calls may be substantial.
 .Sh LOCKING
-Each asynchronous queue is protected by a spin mutex.
+Each asynchronous logging queue is protected by a spin mutex.
 .Pp
 Functions
-.Fn alq_flush ,
-.Fn alq_open
+.Fn alq_flush
 and
-.Fn alq_post
+.Fn alq_open
 may attempt to acquire an internal sleep mutex, and should
 consequently not be used in contexts where sleeping is
 not allowed.
@@ -214,32 +391,36 @@ if it fails to open
 or else it returns 0.
 .Pp
 The
+.Fn alq_writen
+and
 .Fn alq_write
-function returns
+functions return
 .Er EWOULDBLOCK
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 The
+.Fn alq_getn
+and
 .Fn alq_get
-function returns
-.Dv NULL ,
+functions return
+.Dv NULL
 if
 .Dv ALQ_NOWAIT
-was provided as a value to
-.Fa waitok
-and either the queue is full, or when the system is shutting down.
+was set in
+.Fa flags
+and either the queue is full or the system is shutting down.
 .Pp
 NOTE: invalid arguments to non-void functions will result in
 undefined behaviour.
 .Sh SEE ALSO
-.Xr syslog 3 ,
-.Xr kthread 9 ,
+.Xr kproc 9 ,
 .Xr ktr 9 ,
-.Xr tsleep 9 ,
+.Xr msleep_spin 9 ,
+.Xr syslog 3 ,
 .Xr vnode 9
 .Sh HISTORY
 The
@@ -250,7 +431,11 @@ Asynchronous Logging Queues (ALQ) facili
 The
 .Nm
 facility was written by
-.An Jeffrey Roberson Aq j...@freebsd.org .
+.An Jeffrey Roberson Aq j...@freebsd.org
+and extended by
+.An Lawrence Stewart Aq lstew...@freebsd.org .
 .Pp
 This manual page was written by
-.An Hiten Pandya Aq h...@freebsd.org .
+.An Hiten Pandya Aq h...@freebsd.org
+and revised by
+.An Lawrence Stewart Aq lstew...@freebsd.org .

Modified: stable/8/sys/kern/kern_alq.c
==============================================================================
--- stable/8/sys/kern/kern_alq.c        Thu Jul  8 03:28:25 2010        
(r209787)
+++ stable/8/sys/kern/kern_alq.c        Thu Jul  8 03:28:25 2010        
(r209788)
@@ -55,16 +55,23 @@ __FBSDID("$FreeBSD$");
 
 /* Async. Logging Queue */
 struct alq {
+       char    *aq_entbuf;             /* Buffer for stored entries */
        int     aq_entmax;              /* Max entries */
        int     aq_entlen;              /* Entry length */
-       char    *aq_entbuf;             /* Buffer for stored entries */
+       int     aq_freebytes;           /* Bytes available in buffer */
+       int     aq_buflen;              /* Total length of our buffer */
+       int     aq_writehead;           /* Location for next write */
+       int     aq_writetail;           /* Flush starts at this location */
+       int     aq_wrapearly;           /* # bytes left blank at end of buf */
        int     aq_flags;               /* Queue flags */
+       int     aq_waiters;             /* Num threads waiting for resources
+                                        * NB: Used as a wait channel so must
+                                        * not be first field in the alq struct
+                                        */
+       struct  ale     aq_getpost;     /* ALE for use by get/post */
        struct mtx      aq_mtx;         /* Queue lock */
        struct vnode    *aq_vp;         /* Open vnode handle */
        struct ucred    *aq_cred;       /* Credentials of the opening thread */
-       struct ale      *aq_first;      /* First ent */
-       struct ale      *aq_entfree;    /* First free ent */
-       struct ale      *aq_entvalid;   /* First ent valid for writing */
        LIST_ENTRY(alq) aq_act;         /* List of active queues */
        LIST_ENTRY(alq) aq_link;        /* List of all queues */
 };
@@ -73,10 +80,14 @@ struct alq {
 #define        AQ_ACTIVE       0x0002          /* on the active list */
 #define        AQ_FLUSHING     0x0004          /* doing IO */
 #define        AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
+#define        AQ_ORDERED      0x0010          /* Queue enforces ordered 
writes */
+#define        AQ_LEGACY       0x0020          /* Legacy queue (fixed length 
writes) */
 
 #define        ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
 #define        ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
 
+#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
+
 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
 
 /*
@@ -205,7 +216,7 @@ ald_daemon(void)
                needwakeup = alq_doio(alq);
                ALQ_UNLOCK(alq);
                if (needwakeup)
-                       wakeup(alq);
+                       wakeup_one(alq);
                ALD_LOCK();
        }
 
@@ -252,6 +263,20 @@ alq_shutdown(struct alq *alq)
        /* Stop any new writers. */
        alq->aq_flags |= AQ_SHUTDOWN;
 
+       /*
+        * If the ALQ isn't active but has unwritten data (possible if
+        * the ALQ_NOACTIVATE flag has been used), explicitly activate the
+        * ALQ here so that the pending data gets flushed by the ald_daemon.
+        */
+       if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
+               alq->aq_flags |= AQ_ACTIVE;
+               ALQ_UNLOCK(alq);
+               ALD_LOCK();
+               ald_activate(alq);
+               ALD_UNLOCK();
+               ALQ_LOCK(alq);
+       }
+
        /* Drain IO */
        while (alq->aq_flags & AQ_ACTIVE) {
                alq->aq_flags |= AQ_WANTED;
@@ -271,7 +296,6 @@ alq_destroy(struct alq *alq)
        alq_shutdown(alq);
 
        mtx_destroy(&alq->aq_mtx);
-       free(alq->aq_first, M_ALD);
        free(alq->aq_entbuf, M_ALD);
        free(alq, M_ALD);
 }
@@ -287,46 +311,54 @@ alq_doio(struct alq *alq)
        struct vnode *vp;
        struct uio auio;
        struct iovec aiov[2];
-       struct ale *ale;
-       struct ale *alstart;
        int totlen;
        int iov;
        int vfslocked;
+       int wrapearly;
+
+       KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
 
        vp = alq->aq_vp;
        td = curthread;
        totlen = 0;
-       iov = 0;
-
-       alstart = ale = alq->aq_entvalid;
-       alq->aq_entvalid = NULL;
+       iov = 1;
+       wrapearly = alq->aq_wrapearly;
 
        bzero(&aiov, sizeof(aiov));
        bzero(&auio, sizeof(auio));
 
-       do {
-               if (aiov[iov].iov_base == NULL)
-                       aiov[iov].iov_base = ale->ae_data;
-               aiov[iov].iov_len += alq->aq_entlen;
-               totlen += alq->aq_entlen;
-               /* Check to see if we're wrapping the buffer */
-               if (ale->ae_data + alq->aq_entlen != ale->ae_next->ae_data)
-                       iov++;
-               ale->ae_flags &= ~AE_VALID;
-               ale = ale->ae_next;
-       } while (ale->ae_flags & AE_VALID);
+       /* Start the write from the location of our buffer tail pointer. */
+       aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
+
+       if (alq->aq_writetail < alq->aq_writehead) {
+               /* Buffer not wrapped. */
+               totlen = aiov[0].iov_len = alq->aq_writehead - 
alq->aq_writetail;
+       } else if (alq->aq_writehead == 0) {
+               /* Buffer not wrapped (special case to avoid an empty iov). */
+               totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+                   wrapearly;
+       } else {
+               /*
+                * Buffer wrapped, requires 2 aiov entries:
+                * - first is from writetail to end of buffer
+                * - second is from start of buffer to writehead
+                */
+               aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
+                   wrapearly;
+               iov++;
+               aiov[1].iov_base = alq->aq_entbuf;
+               aiov[1].iov_len =  alq->aq_writehead;
+               totlen = aiov[0].iov_len + aiov[1].iov_len;
+       }
 
        alq->aq_flags |= AQ_FLUSHING;
        ALQ_UNLOCK(alq);
 
-       if (iov == 2 || aiov[iov].iov_base == NULL)
-               iov--;
-
        auio.uio_iov = &aiov[0];
        auio.uio_offset = 0;
        auio.uio_segflg = UIO_SYSSPACE;
        auio.uio_rw = UIO_WRITE;
-       auio.uio_iovcnt = iov + 1;
+       auio.uio_iovcnt = iov;
        auio.uio_resid = totlen;
        auio.uio_td = td;
 
@@ -350,8 +382,28 @@ alq_doio(struct alq *alq)
        ALQ_LOCK(alq);
        alq->aq_flags &= ~AQ_FLUSHING;
 
-       if (alq->aq_entfree == NULL)
-               alq->aq_entfree = alstart;
+       /* Adjust writetail as required, taking into account wrapping. */
+       alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
+           alq->aq_buflen;
+       alq->aq_freebytes += totlen + wrapearly;
+
+       /*
+        * If we just flushed part of the buffer which wrapped, reset the
+        * wrapearly indicator.
+        */
+       if (wrapearly)
+               alq->aq_wrapearly = 0;
+
+       /*
+        * If we just flushed the buffer completely, reset indexes to 0 to
+        * minimise buffer wraps.
+        * This is also required to ensure alq_getn() can't wedge itself.
+        */
+       if (!HAS_PENDING_DATA(alq))
+               alq->aq_writehead = alq->aq_writetail = 0;
+
+       KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
+           ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
 
        if (alq->aq_flags & AQ_WANTED) {
                alq->aq_flags &= ~AQ_WANTED;
@@ -376,27 +428,27 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, 
 /*
  * Create the queue data structure, allocate the buffer, and open the file.
  */
+
 int
-alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
-    int size, int count)
+alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int 
cmode,
+    int size, int flags)
 {
        struct thread *td;
        struct nameidata nd;
-       struct ale *ale;
-       struct ale *alp;
        struct alq *alq;
-       char *bufp;
-       int flags;
+       int oflags;
        int error;
-       int i, vfslocked;
+       int vfslocked;
+
+       KASSERT((size > 0), ("%s: size <= 0", __func__));
 
        *alqp = NULL;
        td = curthread;
 
        NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
-       flags = FWRITE | O_NOFOLLOW | O_CREAT;
+       oflags = FWRITE | O_NOFOLLOW | O_CREAT;
 
-       error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL);
+       error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
        if (error)
                return (error);
 
@@ -407,31 +459,20 @@ alq_open(struct alq **alqp, const char *
        VFS_UNLOCK_GIANT(vfslocked);
 
        alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
-       alq->aq_entbuf = malloc(count * size, M_ALD, M_WAITOK|M_ZERO);
-       alq->aq_first = malloc(sizeof(*ale) * count, M_ALD, M_WAITOK|M_ZERO);
        alq->aq_vp = nd.ni_vp;
        alq->aq_cred = crhold(cred);
-       alq->aq_entmax = count;
-       alq->aq_entlen = size;
-       alq->aq_entfree = alq->aq_first;
 
        mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
 
-       bufp = alq->aq_entbuf;
-       ale = alq->aq_first;
-       alp = NULL;
-
-       /* Match up entries with buffers */
-       for (i = 0; i < count; i++) {
-               if (alp)
-                       alp->ae_next = ale;
-               ale->ae_data = bufp;
-               alp = ale;
-               ale++;
-               bufp += size;
-       }
-
-       alp->ae_next = alq->aq_first;
+       alq->aq_buflen = size;
+       alq->aq_entmax = 0;
+       alq->aq_entlen = 0;
+
+       alq->aq_freebytes = alq->aq_buflen;
+       alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
+       alq->aq_writehead = alq->aq_writetail = 0;
+       if (flags & ALQ_ORDERED)
+               alq->aq_flags |= AQ_ORDERED;
 
        if ((error = ald_add(alq)) != 0) {
                alq_destroy(alq);
@@ -443,77 +484,405 @@ alq_open(struct alq **alqp, const char *
        return (0);
 }
 
+int
+alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+    int size, int count)
+{
+       int ret;
+
+       KASSERT((count >= 0), ("%s: count < 0", __func__));
+
+       if (count > 0) {
+               ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
+               (*alqp)->aq_flags |= AQ_LEGACY;
+               (*alqp)->aq_entmax = count;
+               (*alqp)->aq_entlen = size;
+       } else
+               ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
+
+       return (ret);
+}
+
+
 /*
  * Copy a new entry into the queue.  If the operation would block either
  * wait or return an error depending on the value of waitok.
  */
 int
-alq_write(struct alq *alq, void *data, int waitok)
+alq_writen(struct alq *alq, void *data, int len, int flags)
 {
-       struct ale *ale;
+       int activate, copy, ret;
+       void *waitchan;
+
+       KASSERT((len > 0 && len <= alq->aq_buflen),
+           ("%s: len <= 0 || len > aq_buflen", __func__));
 
-       if ((ale = alq_get(alq, waitok)) == NULL)
+       activate = ret = 0;
+       copy = len;
+       waitchan = NULL;
+
+       ALQ_LOCK(alq);
+
+       /*
+        * Fail to perform the write and return EWOULDBLOCK if:
+        * - The message is larger than our underlying buffer.
+        * - The ALQ is being shutdown.
+        * - There is insufficient free space in our underlying buffer
+        *   to accept the message and the user can't wait for space.
+        * - There is insufficient free space in our underlying buffer
+        *   to accept the message and the alq is inactive due to prior
+        *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
+        */
+       if (len > alq->aq_buflen ||
+           alq->aq_flags & AQ_SHUTDOWN ||
+           (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
+           HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
+               ALQ_UNLOCK(alq);
                return (EWOULDBLOCK);
+       }
 
-       bcopy(data, ale->ae_data, alq->aq_entlen);
-       alq_post(alq, ale);
+       /*
+        * If we want ordered writes and there is already at least one thread
+        * waiting for resources to become available, sleep until we're woken.
+        */
+       if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+               KASSERT(!(flags & ALQ_NOWAIT),
+                   ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
+               alq->aq_waiters++;
+               msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
+               alq->aq_waiters--;
+       }
 
-       return (0);
+       /*
+        * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
+        * enter while loop and sleep until we have enough free bytes (former)
+        * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
+        * be in this loop. Otherwise, multiple threads may be sleeping here
+        * competing for ALQ resources.
+        */
+       while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
+               KASSERT(!(flags & ALQ_NOWAIT),
+                   ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
+               alq->aq_flags |= AQ_WANTED;
+               alq->aq_waiters++;
+               if (waitchan)
+                       wakeup(waitchan);
+               msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
+               alq->aq_waiters--;
+
+               /*
+                * If we're the first thread to wake after an AQ_WANTED wakeup
+                * but there isn't enough free space for us, we're going to loop
+                * and sleep again. If there are other threads waiting in this
+                * loop, schedule a wakeup so that they can see if the space
+                * they require is available.
+                */
+               if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
+                   alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
+                       waitchan = alq;
+               else
+                       waitchan = NULL;
+       }
+
+       /*
+        * If there are waiters, we need to signal the waiting threads after we
+        * complete our work. The alq ptr is used as a wait channel for threads
+        * requiring resources to be freed up. In the AQ_ORDERED case, threads
+        * are not allowed to concurrently compete for resources in the above
+        * while loop, so we use a different wait channel in this case.
+        */
+       if (alq->aq_waiters > 0) {
+               if (alq->aq_flags & AQ_ORDERED)
+                       waitchan = &alq->aq_waiters;
+               else
+                       waitchan = alq;
+       } else
+               waitchan = NULL;
+
+       /* Bail if we're shutting down. */
+       if (alq->aq_flags & AQ_SHUTDOWN) {
+               ret = EWOULDBLOCK;
+               goto unlock;
+       }
+
+       /*
+        * If we need to wrap the buffer to accommodate the write,
+        * we'll need 2 calls to bcopy.
+        */
+       if ((alq->aq_buflen - alq->aq_writehead) < len)
+               copy = alq->aq_buflen - alq->aq_writehead;
+
+       /* Copy message (or part thereof if wrap required) to the buffer. */
+       bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
+       alq->aq_writehead += copy;
+
+       if (alq->aq_writehead >= alq->aq_buflen) {
+               KASSERT((alq->aq_writehead == alq->aq_buflen),
+                   ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
+                   __func__,
+                   alq->aq_writehead,
+                   alq->aq_buflen));
+               alq->aq_writehead = 0;
+       }
+
+       if (copy != len) {
+               /*
+                * Wrap the buffer by copying the remainder of our message
+                * to the start of the buffer and resetting aq_writehead.
+                */
+               bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
+               alq->aq_writehead = len - copy;
+       }
+
+       KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
+           ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
+
+       alq->aq_freebytes -= len;
+
+       if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
+               alq->aq_flags |= AQ_ACTIVE;
+               activate = 1;
+       }
+
+       KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
+
+unlock:
+       ALQ_UNLOCK(alq);
+
+       if (activate) {
+               ALD_LOCK();
+               ald_activate(alq);
+               ALD_UNLOCK();
+       }
+
+       /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
+       if (waitchan != NULL)
+               wakeup_one(waitchan);
+
+       return (ret);
+}
+
+int
+alq_write(struct alq *alq, void *data, int flags)
+{
+       /* Should only be called in fixed length message (legacy) mode. */
+       KASSERT((alq->aq_flags & AQ_LEGACY),
+           ("%s: fixed length write on variable length queue", __func__));
+       return (alq_writen(alq, data, alq->aq_entlen, flags));
 }
 
+/*
+ * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
+ */
 struct ale *
-alq_get(struct alq *alq, int waitok)
+alq_getn(struct alq *alq, int len, int flags)
 {
-       struct ale *ale;
-       struct ale *aln;
+       int contigbytes;
+       void *waitchan;
+
+       KASSERT((len > 0 && len <= alq->aq_buflen),
+           ("%s: len <= 0 || len > alq->aq_buflen", __func__));
 
-       ale = NULL;
+       waitchan = NULL;
 
        ALQ_LOCK(alq);
 
-       /* Loop until we get an entry or we're shutting down */
-       while ((alq->aq_flags & AQ_SHUTDOWN) == 0 && 
-           (ale = alq->aq_entfree) == NULL &&
-           (waitok & ALQ_WAITOK)) {
+       /*
+        * Determine the number of free contiguous bytes.
+        * We ensure elsewhere that if aq_writehead == aq_writetail because
+        * the buffer is empty, they will both be set to 0 and therefore
+        * aq_freebytes == aq_buflen and is fully contiguous.
+        * If they are equal and the buffer is not empty, aq_freebytes will
+        * be 0 indicating the buffer is full.
+        */
+       if (alq->aq_writehead <= alq->aq_writetail)
+               contigbytes = alq->aq_freebytes;
+       else {
+               contigbytes = alq->aq_buflen - alq->aq_writehead;
+
+               if (contigbytes < len) {
+                       /*
+                        * Insufficient space at end of buffer to handle a
+                        * contiguous write. Wrap early if there's space at
+                        * the beginning. This will leave a hole at the end
+                        * of the buffer which we will have to skip over when
+                        * flushing the buffer to disk.
+                        */
+                       if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
+                               /* Keep track of # bytes left blank. */
+                               alq->aq_wrapearly = contigbytes;
+                               /* Do the wrap and adjust counters. */
+                               contigbytes = alq->aq_freebytes =
+                                   alq->aq_writetail;
+                               alq->aq_writehead = 0;
+                       }
+               }
+       }
+
+       /*

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***
_______________________________________________
svn-src-all@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-all
To unsubscribe, send any mail to "svn-src-all-unsubscr...@freebsd.org"

Reply via email to