Add new APIs in stream class that works with poll group. Signed-off-by: Andy Zhou <az...@ovn.org> --- lib/stream-fd.c | 32 ++++++++++++++++++++ lib/stream-provider.h | 17 +++++++++++ lib/stream-ssl.c | 40 ++++++++++++++++++++++++- lib/stream-tcp.c | 3 ++ lib/stream-unix.c | 3 ++ lib/stream.c | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++- lib/stream.h | 5 ++++ 7 files changed, 181 insertions(+), 2 deletions(-)
diff --git a/lib/stream-fd.c b/lib/stream-fd.c index 31bfc6e..496a517 100644 --- a/lib/stream-fd.c +++ b/lib/stream-fd.c @@ -28,6 +28,7 @@ #include "socket-util.h" #include "util.h" #include "stream-provider.h" +#include "poll-group.h" #include "stream.h" #include "openvswitch/vlog.h" @@ -160,6 +161,34 @@ fd_wait(struct stream *stream, enum stream_wait_type wait) } } +static int +fd_join(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + struct poll_group *group = stream->poll_group; + void *caller_event = stream->caller_event; + + return poll_group_join(group, s->fd, caller_event); +} + +static int +fd_update(struct stream *stream, bool write) +{ + struct stream_fd *s = stream_fd_cast(stream); + struct poll_group *group = stream->poll_group; + + return poll_group_update(group, s->fd, write, stream->caller_event); +} + +static int +fd_leave(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + struct poll_group *group = stream->poll_group; + + return poll_group_leave(group, s->fd); +} + static const struct stream_class stream_fd_class = { "fd", /* name */ false, /* needs_probes */ @@ -171,6 +200,9 @@ static const struct stream_class stream_fd_class = { NULL, /* run */ NULL, /* run_wait */ fd_wait, /* wait */ + fd_join, /* join */ + fd_update, /* update */ + fd_leave, /* leave */ }; /* Passive file descriptor stream. */ diff --git a/lib/stream-provider.h b/lib/stream-provider.h index 2226a80..74dad08 100644 --- a/lib/stream-provider.h +++ b/lib/stream-provider.h @@ -20,6 +20,7 @@ #include <sys/types.h> #include "stream.h" +struct poll_group; /* Active stream connection. */ /* Active stream connection. @@ -30,6 +31,11 @@ struct stream { int state; int error; char *name; + + /* poll_group related states. */ + struct poll_group *poll_group; + void *caller_event; + bool joined; }; void stream_init(struct stream *, const struct stream_class *, @@ -123,6 +129,17 @@ struct stream_class { /* Arranges for the poll loop to wake up when 'stream' is ready to take an * action of the given 'type'. */ void (*wait)(struct stream *stream, enum stream_wait_type type); + + /* Arranges for 'stream' to join poll group when it is connected. */ + int (*join)(struct stream *stream); + + /* Let 'stream' inform pull group about its interest to be waken up + * by tx_ready. */ + int (*update)(struct stream *stream, bool write); + + /* Disconnects stream from poll group. A disconnected stream will fall + * back to use poll loop directly. */ + int (*leave)(struct stream *stream); }; /* Passive listener for incoming stream connections. diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c index 2699633..88dea03 100644 --- a/lib/stream-ssl.c +++ b/lib/stream-ssl.c @@ -38,6 +38,7 @@ #include "ofpbuf.h" #include "openflow/openflow.h" #include "packets.h" +#include "poll-group.h" #include "poll-loop.h" #include "shash.h" #include "socket-util.h" @@ -670,7 +671,6 @@ ssl_send(struct stream *stream, const void *buffer, size_t n) case EAGAIN: return n; default: - sslv->txbuf = NULL; return -error; } } @@ -684,6 +684,12 @@ ssl_run(struct stream *stream) if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) { ssl_clear_txbuf(sslv); } + + if (sslv->tx_want != SSL_NOTHING) { + if (want_to_poll_events(sslv->tx_want) == SSL_WRITING) { + stream_update(stream, true); + } + } } static void @@ -747,6 +753,35 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) } } +static int +ssl_join(struct stream *stream) +{ + struct ssl_stream *sslv = ssl_stream_cast(stream); + struct poll_group *group = stream->poll_group; + void *caller_event = stream->caller_event; + + return poll_group_join(group, sslv->fd, caller_event); +} + +static int +ssl_update(struct stream *stream, bool write) +{ + struct ssl_stream *sslv = ssl_stream_cast(stream); + struct poll_group *group = stream->poll_group; + void *caller_event = stream->caller_event; + + return poll_group_update(group, sslv->fd, write, caller_event); +} + +static int +ssl_leave(struct stream *stream) +{ + struct ssl_stream *sslv = ssl_stream_cast(stream); + struct poll_group *group = stream->poll_group; + + return poll_group_leave(group, sslv->fd); +} + const struct stream_class ssl_stream_class = { "ssl", /* name */ true, /* needs_probes */ @@ -758,6 +793,9 @@ const struct stream_class ssl_stream_class = { ssl_run, /* run */ ssl_run_wait, /* run_wait */ ssl_wait, /* wait */ + ssl_join, /* join */ + ssl_update, /* update */ + ssl_leave, /* leave */ }; /* Passive SSL. */ diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c index fc5a606..472cfa2 100644 --- a/lib/stream-tcp.c +++ b/lib/stream-tcp.c @@ -73,6 +73,9 @@ const struct stream_class tcp_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* join */ + NULL, /* update */ + NULL, /* leave */ }; #ifdef _WIN32 diff --git a/lib/stream-unix.c b/lib/stream-unix.c index cadd180..b3ce2ce 100644 --- a/lib/stream-unix.c +++ b/lib/stream-unix.c @@ -72,6 +72,9 @@ const struct stream_class unix_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* join */ + NULL, /* update */ + NULL, /* leave */ }; /* Passive UNIX socket. */ diff --git a/lib/stream.c b/lib/stream.c index 217191c..905f600 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015 Nicira, Inc. + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ #include <inttypes.h> #include <netinet/in.h> #include <poll.h> +#include <poll-group.h> #include <stdlib.h> #include <string.h> #include "coverage.h" @@ -187,6 +188,23 @@ stream_lookup_class(const char *name, const struct stream_class **classp) return EAFNOSUPPORT; } +/* Helper function that is called when a stream's state transit from + * SCS_CONNECTING to SCS_CONNECTED state */ +static void +stream_join_poll_group(struct stream *stream) +{ + if (stream->poll_group && stream->caller_event && !stream->joined) { + int ret = stream->class->join(stream); + if (ret) { + VLOG_ERR("Stream [%s] failed to join poll_group [%s]\n", + stream_get_name(stream), + poll_group_get_name(stream->poll_group)); + } else { + stream->joined = true; + } + } +} + /* Returns 0 if 'name' is a stream name in the form "TYPE:ARGS" and TYPE is * a supported stream type, otherwise EAFNOSUPPORT. */ int @@ -278,6 +296,7 @@ stream_close(struct stream *stream) { if (stream != NULL) { char *name = stream->name; + stream_leave(stream); (stream->class->close)(stream); free(name); } @@ -317,13 +336,16 @@ stream_connect(struct stream *stream) last_state = stream->state; switch (stream->state) { case SCS_CONNECTING: + stream_leave(stream); scs_connecting(stream); break; case SCS_CONNECTED: + stream_join_poll_group(stream); return 0; case SCS_DISCONNECTED: + stream_leave(stream); return stream->error; default: @@ -430,6 +452,62 @@ stream_send_wait(struct stream *stream) stream_wait(stream, STREAM_SEND); } +int +stream_join(struct stream *stream, struct poll_group *group, void *caller_event) +{ + if (stream->poll_group || stream->caller_event || stream->joined) { + return 1; + } + + if (!stream->class->join || !stream->class->update + ||!stream->class->leave) { + return 1; + } + + stream->poll_group = group; + stream->caller_event= caller_event; + + if (stream->state == SCS_CONNECTED) { + stream_join_poll_group(stream); + } + + return 0; +} + +int +stream_update(struct stream *stream, bool write) +{ + int ret; + + if (!stream->poll_group) { + return 1; + } + + ret = stream->class->update(stream, write); + + return ret; +} + +int +stream_leave(struct stream *stream) +{ + int ret = 0; + + if (stream->poll_group && stream->joined) { + ret = stream->class->leave(stream); + stream->joined = false; + } + + return ret; +} + +/* Report if a stream is a member of a polll group. */ +bool +stream_joined(struct stream *stream) +{ + return stream->joined; +} + /* Given 'name', a pstream name in the form "TYPE:ARGS", stores the class * named "TYPE" into '*classp' and returns 0. Returns EAFNOSUPPORT and stores * a null pointer into '*classp' if 'name' is in the wrong form or if no such @@ -628,6 +706,9 @@ stream_init(struct stream *stream, const struct stream_class *class, : SCS_DISCONNECTED); stream->error = connect_status; stream->name = xstrdup(name); + stream->poll_group = NULL; + stream->caller_event = NULL; + stream->joined = false; ovs_assert(stream->state != SCS_CONNECTING || class->connect); } diff --git a/lib/stream.h b/lib/stream.h index f8e1891..c7648c7 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -27,6 +27,7 @@ struct pstream; struct stream; +struct poll_group; struct vlog_module; void stream_usage(const char *name, bool active, bool passive, bool bootstrap); @@ -40,6 +41,10 @@ const char *stream_get_name(const struct stream *); int stream_connect(struct stream *); int stream_recv(struct stream *, void *buffer, size_t n); int stream_send(struct stream *, const void *buffer, size_t n); +int stream_join(struct stream *, struct poll_group *group, void *caller_event); +int stream_update(struct stream *, bool write); +int stream_leave(struct stream *); +bool stream_joined(struct stream *); void stream_run(struct stream *); void stream_run_wait(struct stream *); -- 1.9.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev