On Sun, 15 Jul 2018 at 13:20, Thomas Koenig <[email protected]> wrote:
> So, here is the final version. I would really like to get this
> into trunk, and out of the way, so Nicolas and I can focus on
> other things.
>
> So, OK?
[I know i'm late as it was already applied]
For me it would be easier to read the locking if struct async_unit had
it's queue_lock named q_lock/qlock instead of plain lock.
The io_lock is named nicely already.
Furthermore there is a mixture of (correctly wrapped) __gthread_ in
struct adv_cond versus unwrapped pthread_mutex_t in struct async_unit
where i'd have expected the latter to also use the __gthread wrappers.
struct adv_cond member pending should not be an int but an unsigned
int or, even better, a bool, i'd say.
transfer_array_inner () is named unintuitively IMHO. Maybe
transfer_array_now, transfer_array_scalar or transfer_array_1.
Index: libgfortran/io/async.c
===================================================================
--- libgfortran/io/async.c (nicht existent)
+++ libgfortran/io/async.c (Arbeitskopie)
[]
+static void
+update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
+ st_parameter_dt *temp;
+ NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
+ temp = *old;
+ *old = new;
+ if (temp)
+ free (temp);
+}
free (NULL) is perfectly valid, please remove the if.
+static void *
+async_io (void *arg)
+{
[]
+ while (true)
+ {
+ /* Main loop. At this point, au->lock is always held. */
dot space space at the end of a sentence please.
[]
+ while (ctq)
+ {
+ if (prev)
+ free (prev);
Likewise, drop if.
+ prev = ctq;
+ if (!au->error.has_error)
I'd flag that as likely.
Likewise, i'd flag finish_thread as unlikely; Being a label you can
hint the predictor that it's rather unlikely jumped to by flagging it
cold:
finish_thread: __attribute__((cold));
+/* Initialize an asyncronous unit, returning zero on success,
+ nonzero on failure. It also sets u->au. */
+
+void
+init_async_unit (gfc_unit *u)
s/asyncronous/asynchronous/
+{
+ async_unit *au;
+ if (!__gthread_active_p ())
+ {
+ u->au = NULL;
+ return;
+ }
+
Excess horizontal space on the empty line above.
+ au = (async_unit *) xmalloc (sizeof (async_unit));
I'd XCNEW (async_unit) and omit all those NULL and 0 stores.
You should use the scalar allocators provided in include/libiberty.h
throughout, so s/xmalloc/XNEW/ and s/free/XDELETE/ and so on.
+/* Enqueue a transfer statement. */
+
+void
+enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
+{
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
+ tq->arg = *arg;
boom on OOM. XCNEW (transfer_queue), please.
+ tq->arg = *arg;
+ tq->type = type;
+ tq->has_id = 0;
redundant store to has_id.
+ LOCK (&au->lock);
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = false;
+ UNLOCK (&au->lock);
+ SIGNAL (&au->work);
+}
+/* Enqueue an st_write_done or st_read_done which contains an ID. */
+
+int
+enqueue_done_id (async_unit *au, enum aio_do type)
+{
+ int ret;
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
XCNEW.
+/* Enqueue an st_write_done or st_read_done without an ID. */
+
+void
+enqueue_done (async_unit *au, enum aio_do type)
+{
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
XCNEW.
+ tq->type = type;
+ tq->has_id = 0;
Redundant store to has_id. Maybe just comment it out if you do want to
emphasis this side-effect of zeroing.
/* tq->has_id = 0; already done by XCNEW */
or the like.
+/* Enqueue a CLOSE statement. */
+
+void
+enqueue_close (async_unit *au)
+{
+ transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
XCNEW.
And i think enqueue_close does not need internal_proto but could be
and should be static.
Or, even better, remove it completely and call
enqueue_done (au, AIO_CLOSE)
in async_close directly.
+/* The asynchronous unit keeps the currently active PDT around.
+ This function changes that to the current one. */
+
+void
+enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
+{
+ st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
XNEW (st_parameter_dt);
+ transfer_queue *tq = xmalloc (sizeof (transfer_queue));
XNEW (transfer_queue);
+
+ memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
+
+ NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
+ NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
+ tq->next = NULL;
+ tq->type = AIO_DATA_TRANSFER_INIT;
+ tq->read_flag = read_flag;
+ tq->has_id = 0;
ah, that should be bool, not _Bool and hence s/0/false/ and s/1/true/
here and elsewhere when storing to has_id.
since read_flag seems to be a boolean too, i'd
unsigned has_id : 1;
unsiged read_flag : 1;
fwiw.
+ tq->new_pdt = new;
+ LOCK (&au->lock);
+
+ if (!au->tail)
+ au->head = tq;
+ else
+ au->tail->next = tq;
+ au->tail = tq;
+ REVOKE_SIGNAL (&(au->emptysignal));
+ au->empty = 0;
s/0/false/ please.
+ UNLOCK (&au->lock);
+ SIGNAL (&au->work);
+}
+/* Perform a wait operation on an asynchronous unit with an ID specified,
+ which means collecting the errors that may have happened asynchronously.
+ Return true if an error has been encountered. */
+
+bool
+async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
I'd rename the parameter i to id for clarity.
+ WAIT_SIGNAL_MUTEX (&(au->id.done),
+ (au->id.low >= au->id.waiting || au->empty), &au->lock);
I'd test au->empty first.
Not sure why it's ok to clear has_error in collect_async_errors --
especially without locking -- but i guess you tested such async
failure conditions in both async_wait_id and async_wait.
Index: libgfortran/io/async.h
===================================================================
--- libgfortran/io/async.h (nicht existent)
+++ libgfortran/io/async.h (Arbeitskopie)
@@ -0,0 +1,378 @@
+/* Thread - local storage of the current unit we are looking at. Needed for
+ error reporting. */
dot space space at the end of a sentence.
The layout of struct async_unit does not look too good on a 64bit box.
+bool collect_async_errors (st_parameter_common *, async_unit *);
+internal_proto (collect_async_errors);
superfluous trailing space
Index: libgfortran/io/file_pos.c
===================================================================
--- libgfortran/io/file_pos.c (Revision 259739)
+++ libgfortran/io/file_pos.c (Arbeitskopie)
@@ -267,8 +280,13 @@ st_backspace (st_parameter_filepos *fpp)
done:
if (u != NULL)
- unlock_unit (u);
+ {
+ unlock_unit (u);
+ if (u->au && needs_unlock)
+ UNLOCK (&u->au->io_lock);
+ }
+
library_end ();
}
in st_backspace you first unlock the unit and only afterwards unlock
the async io_lock.
I would settle on first unlocking the async io_lock and only then
unlocking the unit, no?
@@ -376,9 +406,12 @@ st_endfile (st_parameter_filepos *fpp)
}
}
- done:
- unlock_unit (u);
+ done:
+ if (u->au && needs_unlock)
+ UNLOCK (&u->au->io_lock);
+ unlock_unit (u);
+
library_end ();
}
like you do here, in st_endfile.
Here in st_endfile, why do you async_wait before the
LIBERROR_OPTION_CONFLICT handling by
if (u->flags.access == ACCESS_SEQUENTIAL
&& u->endfile == AFTER_ENDFILE)
and not afterwards?
@@ -450,6 +499,7 @@ void
st_flush (st_parameter_filepos *fpp)
{
gfc_unit *u;
+ bool needs_unlock = false;
library_start (&fpp->common);
@@ -456,6 +506,17 @@ st_flush (st_parameter_filepos *fpp)
u = find_unit (fpp->common.unit);
if (u != NULL)
{
+ if (u->au)
+ {
+ if (async_wait (&(fpp->common), u->au))
+ return;
+ else
+ {
+ needs_unlock = true;
+ LOCK (&u->au->io_lock);
+ }
+ }
+
/* Make sure format buffer is flushed. */
if (u->flags.form == FORM_FORMATTED)
fbuf_flush (u, u->mode);
@@ -469,5 +530,8 @@ st_flush (st_parameter_filepos *fpp)
generate_error (&fpp->common, LIBERROR_BAD_OPTION,
"Specified UNIT in FLUSH is not connected");
+ if (needs_unlock)
+ UNLOCK (&u->au->io_lock);
I would change the condition to if (ASYNC_IO && needs_unlock) for consistency.
+
library_end ();
}
Index: libgfortran/io/inquire.c
===================================================================
--- libgfortran/io/inquire.c (Revision 259739)
+++ libgfortran/io/inquire.c (Arbeitskopie)
@@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
/* Implement the non-IOLENGTH variant of the INQUIRY statement */
#include "io.h"
+#include "async.h"
#include "unix.h"
#include <string.h>
please include async.h *after* unix.h like you do everwhere else.
Index: libgfortran/io/read.c
===================================================================
--- libgfortran/io/read.c (Revision 259739)
+++ libgfortran/io/read.c (Arbeitskopie)
@@ -30,6 +30,7 @@ see the files COPYING3 and COPYING.RUNTIME respect
#include <string.h>
#include <ctype.h>
#include <assert.h>
+#include "async.h"
typedef unsigned char uchar;
@@ -42,6 +43,7 @@ typedef unsigned char uchar;
void
set_integer (void *dest, GFC_INTEGER_LARGEST value, int length)
{
+ NOTE ("set_integer: %lld %p", (long long int) value, dest);
switch (length)
{
#ifdef HAVE_GFC_INTEGER_16
Debugging leftover? Please remove the include and the NOTE.
--- libgfortran/io/transfer.c (Revision 259739)
+++ libgfortran/io/transfer.c (Arbeitskopie)
+/* Wrapper function for I/O of scalar types. If this should be an async I/O
+ request, queue it. For a synchronous write on an async unit, perform the
+ wait operation and return an error. For all synchronous writes, call the
+ right transfer function. */
+static void
+wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind,
+ size_t size, size_t n_elem)
+{
+ if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+ {
+ if (dtp->u.p.async)
+ {
Please move this second nested if into the first one.
+ transfer_args args;
+ args.scalar.transfer = dtp->u.p.transfer;
+ args.scalar.arg_bt = type;
+ args.scalar.data = p;
+ args.scalar.i = kind;
+ args.scalar.s1 = size;
+ args.scalar.s2 = n_elem;
+ enqueue_transfer (dtp->u.p.current_unit->au, &args,
+ AIO_TRANSFER_SCALAR);
+ return;
+ }
+ }
void
+transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind,
+ gfc_charlen_type charlen)
+{
+ if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK)
+ return;
+
+ if (dtp->u.p.current_unit && dtp->u.p.current_unit->au)
+ {
+ if (dtp->u.p.async)
+ {
Likewise:
Please move this second nested if into the first one.
+ transfer_args args;
+ size_t sz = sizeof (gfc_array_char)
+ + sizeof (descriptor_dimension)
+ * GFC_DESCRIPTOR_RANK (desc);
+ args.array.desc = xmalloc (sz);
+ NOTE ("desc = %p", (void *) args.array.desc);
+ memcpy (args.array.desc, desc, sz);
+ args.array.kind = kind;
+ args.array.charlen = charlen;
+ enqueue_transfer (dtp->u.p.current_unit->au, &args,
+ AIO_TRANSFER_ARRAY);
+ return;
+ }
+ }
+ /* Come here if there was no asynchronous I/O to be scheduled. */
+ transfer_array_inner (dtp, desc, kind, charlen);
+}
@@ -2770,6 +2839,42 @@ data_transfer_init (st_parameter_dt *dtp, int read
else if (dtp->u.p.current_unit->internal_unit_kind > 0)
dtp->u.p.unit_is_internal = 1;
+ if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0)
+ {
+ int f;
+ f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len,
+ async_opt, "Bad ASYNCHRONOUS in data transfer "
+ "statement");
+ if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES)
+ {
+ generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT,
+ "ASYNCHRONOUS transfer without "
+ "ASYHCRONOUS='YES' in OPEN");
s/ASYHCRONOUS/ASYNCHRONOUS/
+ return;
+ }
+ dtp->u.p.async = f == ASYNC_YES;
+ }
[]
+void
+data_transfer_init_worker (st_parameter_dt *dtp, int read_flag)
Missing function comment.
+{
+ GFC_INTEGER_4 cf = dtp->common.flags;
+
+ NOTE ("starting worker...");
+
+ if (read_flag && dtp->u.p.current_unit->flags.form != FORM_UNFORMATTED
+ && ((cf & IOPARM_DT_LIST_FORMAT) != 0)
Excess braces
+ && dtp->u.p.current_unit->child_dtio == 0)
Surplus horizontal whitespace before ==
+ dtp->u.p.current_unit->last_char = EOF - 1;
[]
+void
+st_read_done (st_parameter_dt *dtp)
+{
+ if (dtp->u.p.current_unit)
+ {
+ if (dtp->u.p.current_unit->au)
+ {
+ if (dtp->common.flags & IOPARM_DT_HAS_ID)
+ *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au,
AIO_READ_DONE);
Surplus trailing whitespace.
+ else
+ {
+ enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE);
+ /* An asynchronous unit without ASYNCHRONOUS="YES" - make this
+ synchronous by performing a wait operation. */
+ if (!dtp->u.p.async)
+ async_wait (&dtp->common, dtp->u.p.current_unit->au);
Don't we have to honour handled errors from async_wait?
Same for st_write_done ().
+ }
+ }
+ else
+ st_read_done_worker (dtp);
--- libgfortran/io/unit.c (Revision 259739)
+++ libgfortran/io/unit.c (Arbeitskopie)
@@ -922,7 +930,7 @@ newunit_alloc (void)
memset (newunits + old_size, 0, old_size);
newunits[old_size] = true;
newunit_lwi = old_size + 1;
- __gthread_mutex_unlock (&unit_lock);
+ UNLOCK (&unit_lock);
This should be indented by 2 spaces, not 4.
return -old_size + NEWUNIT_START;
}
--- libgfortran/libgfortran.h (Revision 259739)
+++ libgfortran/libgfortran.h (Arbeitskopie)
@@ -743,6 +743,9 @@ internal_proto(translate_error);
extern void generate_error (st_parameter_common *, int, const char *);
iexport_proto(generate_error);
+extern bool generate_error_common (st_parameter_common *, int, const char *);
+iexport_proto(generate_error_common);
why is that exported and not just internal_proto() ?
+
extern void generate_warning (st_parameter_common *, const char *);
internal_proto(generate_warning);
@@ -1748,5 +1751,7 @@ void cshift1_16_c16 (gfc_array_c16 * const restric
internal_proto(cshift1_16_c16);
#endif
+/* Define this if we support asynchronous I/O on this platform. This
+ currently requires weak symbols. */
#endif /* LIBGFOR_H */
obsolete comment, please remove.
Thanks for the async support!
cheers,