Add new APIs in stream class that works with poll group.

Signed-off-by: Andy Zhou <az...@ovn.org>
---
 lib/stream-fd.c       | 32 ++++++++++++++++++++
 lib/stream-provider.h | 17 +++++++++++
 lib/stream-ssl.c      | 40 ++++++++++++++++++++++++-
 lib/stream-tcp.c      |  3 ++
 lib/stream-unix.c     |  3 ++
 lib/stream.c          | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/stream.h          |  5 ++++
 7 files changed, 181 insertions(+), 2 deletions(-)

diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 31bfc6e..496a517 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -28,6 +28,7 @@
 #include "socket-util.h"
 #include "util.h"
 #include "stream-provider.h"
+#include "poll-group.h"
 #include "stream.h"
 #include "openvswitch/vlog.h"
 
@@ -160,6 +161,34 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
     }
 }
 
+static int
+fd_join(struct stream *stream)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    struct poll_group *group = stream->poll_group;
+    void *caller_event = stream->caller_event;
+
+    return poll_group_join(group, s->fd, caller_event);
+}
+
+static int
+fd_update(struct stream *stream, bool write)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    struct poll_group *group = stream->poll_group;
+
+    return poll_group_update(group, s->fd, write, stream->caller_event);
+}
+
+static int
+fd_leave(struct stream *stream)
+{
+    struct stream_fd *s = stream_fd_cast(stream);
+    struct poll_group *group = stream->poll_group;
+
+    return poll_group_leave(group, s->fd);
+}
+
 static const struct stream_class stream_fd_class = {
     "fd",                       /* name */
     false,                      /* needs_probes */
@@ -171,6 +200,9 @@ static const struct stream_class stream_fd_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     fd_wait,                    /* wait */
+    fd_join,                    /* join */
+    fd_update,                  /* update */
+    fd_leave,                   /* leave */
 };
 
 /* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 2226a80..74dad08 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -20,6 +20,7 @@
 #include <sys/types.h>
 #include "stream.h"
 
+struct poll_group;
 /* Active stream connection. */
 
 /* Active stream connection.
@@ -30,6 +31,11 @@ struct stream {
     int state;
     int error;
     char *name;
+
+    /* poll_group related states.  */
+    struct poll_group *poll_group;
+    void *caller_event;
+    bool joined;
 };
 
 void stream_init(struct stream *, const struct stream_class *,
@@ -123,6 +129,17 @@ struct stream_class {
     /* Arranges for the poll loop to wake up when 'stream' is ready to take an
      * action of the given 'type'. */
     void (*wait)(struct stream *stream, enum stream_wait_type type);
+
+    /* Arranges for 'stream' to join poll group when it is connected. */
+    int (*join)(struct stream *stream);
+
+    /* Let 'stream' inform pull group about its interest to be waken up
+     * by tx_ready.  */
+    int (*update)(struct stream *stream, bool write);
+
+    /* Disconnects stream from poll group. A disconnected stream will fall
+     * back to use poll loop directly.   */
+    int (*leave)(struct stream *stream);
 };
 
 /* Passive listener for incoming stream connections.
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 2699633..88dea03 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -38,6 +38,7 @@
 #include "ofpbuf.h"
 #include "openflow/openflow.h"
 #include "packets.h"
+#include "poll-group.h"
 #include "poll-loop.h"
 #include "shash.h"
 #include "socket-util.h"
@@ -670,7 +671,6 @@ ssl_send(struct stream *stream, const void *buffer, size_t 
n)
         case EAGAIN:
             return n;
         default:
-            sslv->txbuf = NULL;
             return -error;
         }
     }
@@ -684,6 +684,12 @@ ssl_run(struct stream *stream)
     if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
         ssl_clear_txbuf(sslv);
     }
+
+    if (sslv->tx_want != SSL_NOTHING) {
+        if (want_to_poll_events(sslv->tx_want) == SSL_WRITING) {
+            stream_update(stream, true);
+        }
+    }
 }
 
 static void
@@ -747,6 +753,35 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
     }
 }
 
+static int
+ssl_join(struct stream *stream)
+{
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+    struct poll_group *group = stream->poll_group;
+    void *caller_event = stream->caller_event;
+
+    return poll_group_join(group, sslv->fd, caller_event);
+}
+
+static int
+ssl_update(struct stream *stream, bool write)
+{
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+    struct poll_group *group = stream->poll_group;
+    void *caller_event = stream->caller_event;
+
+    return poll_group_update(group, sslv->fd, write, caller_event);
+}
+
+static int
+ssl_leave(struct stream *stream)
+{
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+    struct poll_group *group = stream->poll_group;
+
+    return poll_group_leave(group, sslv->fd);
+}
+
 const struct stream_class ssl_stream_class = {
     "ssl",                      /* name */
     true,                       /* needs_probes */
@@ -758,6 +793,9 @@ const struct stream_class ssl_stream_class = {
     ssl_run,                    /* run */
     ssl_run_wait,               /* run_wait */
     ssl_wait,                   /* wait */
+    ssl_join,                   /* join */
+    ssl_update,                 /* update */
+    ssl_leave,                  /* leave */
 };
 
 /* Passive SSL. */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index fc5a606..472cfa2 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -73,6 +73,9 @@ const struct stream_class tcp_stream_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     NULL,                       /* wait */
+    NULL,                       /* join */
+    NULL,                       /* update */
+    NULL,                       /* leave */
 };
 
 #ifdef _WIN32
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index cadd180..b3ce2ce 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -72,6 +72,9 @@ const struct stream_class unix_stream_class = {
     NULL,                       /* run */
     NULL,                       /* run_wait */
     NULL,                       /* wait */
+    NULL,                       /* join */
+    NULL,                       /* update */
+    NULL,                       /* leave */
 };
 
 /* Passive UNIX socket. */
diff --git a/lib/stream.c b/lib/stream.c
index 217191c..905f600 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 #include <inttypes.h>
 #include <netinet/in.h>
 #include <poll.h>
+#include <poll-group.h>
 #include <stdlib.h>
 #include <string.h>
 #include "coverage.h"
@@ -187,6 +188,23 @@ stream_lookup_class(const char *name, const struct 
stream_class **classp)
     return EAFNOSUPPORT;
 }
 
+/* Helper function that is called when a stream's state transit from
+ * SCS_CONNECTING to SCS_CONNECTED state */
+static void
+stream_join_poll_group(struct stream *stream)
+{
+    if (stream->poll_group && stream->caller_event && !stream->joined) {
+        int ret = stream->class->join(stream);
+        if (ret) {
+            VLOG_ERR("Stream [%s] failed to join poll_group [%s]\n",
+                     stream_get_name(stream),
+                     poll_group_get_name(stream->poll_group));
+        } else {
+            stream->joined = true;
+        }
+    }
+}
+
 /* Returns 0 if 'name' is a stream name in the form "TYPE:ARGS" and TYPE is
  * a supported stream type, otherwise EAFNOSUPPORT.  */
 int
@@ -278,6 +296,7 @@ stream_close(struct stream *stream)
 {
     if (stream != NULL) {
         char *name = stream->name;
+        stream_leave(stream);
         (stream->class->close)(stream);
         free(name);
     }
@@ -317,13 +336,16 @@ stream_connect(struct stream *stream)
         last_state = stream->state;
         switch (stream->state) {
         case SCS_CONNECTING:
+            stream_leave(stream);
             scs_connecting(stream);
             break;
 
         case SCS_CONNECTED:
+            stream_join_poll_group(stream);
             return 0;
 
         case SCS_DISCONNECTED:
+            stream_leave(stream);
             return stream->error;
 
         default:
@@ -430,6 +452,62 @@ stream_send_wait(struct stream *stream)
     stream_wait(stream, STREAM_SEND);
 }
 
+int
+stream_join(struct stream *stream, struct poll_group *group, void 
*caller_event)
+{
+    if (stream->poll_group || stream->caller_event || stream->joined) {
+        return 1;
+    }
+
+    if (!stream->class->join || !stream->class->update
+        ||!stream->class->leave) {
+        return 1;
+    }
+
+    stream->poll_group = group;
+    stream->caller_event= caller_event;
+
+    if (stream->state == SCS_CONNECTED) {
+        stream_join_poll_group(stream);
+    }
+
+    return 0;
+}
+
+int
+stream_update(struct stream *stream, bool write)
+{
+    int ret;
+
+    if (!stream->poll_group) {
+        return 1;
+    }
+
+    ret = stream->class->update(stream, write);
+
+    return ret;
+}
+
+int
+stream_leave(struct stream *stream)
+{
+    int ret = 0;
+
+    if (stream->poll_group && stream->joined) {
+       ret = stream->class->leave(stream);
+       stream->joined = false;
+    }
+
+    return ret;
+}
+
+/* Report if a stream is a member of a polll group. */
+bool
+stream_joined(struct stream *stream)
+{
+    return stream->joined;
+}
+
 /* Given 'name', a pstream name in the form "TYPE:ARGS", stores the class
  * named "TYPE" into '*classp' and returns 0.  Returns EAFNOSUPPORT and stores
  * a null pointer into '*classp' if 'name' is in the wrong form or if no such
@@ -628,6 +706,9 @@ stream_init(struct stream *stream, const struct 
stream_class *class,
                     : SCS_DISCONNECTED);
     stream->error = connect_status;
     stream->name = xstrdup(name);
+    stream->poll_group = NULL;
+    stream->caller_event = NULL;
+    stream->joined = false;
     ovs_assert(stream->state != SCS_CONNECTING || class->connect);
 }
 
diff --git a/lib/stream.h b/lib/stream.h
index f8e1891..c7648c7 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -27,6 +27,7 @@
 
 struct pstream;
 struct stream;
+struct poll_group;
 struct vlog_module;
 
 void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
@@ -40,6 +41,10 @@ const char *stream_get_name(const struct stream *);
 int stream_connect(struct stream *);
 int stream_recv(struct stream *, void *buffer, size_t n);
 int stream_send(struct stream *, const void *buffer, size_t n);
+int stream_join(struct stream *, struct poll_group *group, void *caller_event);
+int stream_update(struct stream *, bool write);
+int stream_leave(struct stream *);
+bool stream_joined(struct stream *);
 
 void stream_run(struct stream *);
 void stream_run_wait(struct stream *);
-- 
1.9.1

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to