Hi,
Was going to commit this, but cedric/bluebugs/etc suggested I send it here
first to get a look over.
Attached is a patch which implements full threading support for ecore_con.
When the ECORE_CON_USE_THREADS flag is passed in server_connect/add, it enables
threaded serving mode. In this mode, all blocking I/O (receiving and sending
on the sockets) spawns a new thread which then pipe writes the data back to
main loop once the operation is complete. This will make all ecore_con
operations and events asynchronous (wrt main loop) when enabled.
In my preliminary tests, this worked flawlessly. I'll write back with some
benchmarks after I get some sleep since I've spent the past 16 hours
(it's now 10am) coding to try and get all of my last minute 1.0 merges in :)
--
Mike Blumenkrantz
Zentific: Our boolean values are huge.
Index: Ecore_Con.h
===================================================================
--- Ecore_Con.h (revision 51113)
+++ Ecore_Con.h (working copy)
@@ -161,7 +161,9 @@
/** Use TLS */
ECORE_CON_USE_TLS = (1 << 6),
/** Attempt to use the previously loaded certificate */
- ECORE_CON_LOAD_CERT = (1 << 7)
+ ECORE_CON_LOAD_CERT = (1 << 7),
+ /** Use threads for send/recv operations */
+ ECORE_CON_USE_THREADS = (1 << 8)
} Ecore_Con_Type;
#define ECORE_CON_USE_SSL ECORE_CON_USE_SSL2
#define ECORE_CON_REMOTE_SYSTEM ECORE_CON_REMOTE_TCP
Index: ecore_con_private.h
===================================================================
--- ecore_con_private.h (revision 51113)
+++ ecore_con_private.h (working copy)
@@ -105,6 +105,9 @@
ECORE_MAGIC;
int fd;
Ecore_Con_Type type;
+ Eina_Bool threaded:1;
+ Ecore_Thread *recv_thread;
+ Ecore_Thread *send_thread;
char *name;
int port;
char *path;
Index: ecore_con.c
===================================================================
--- ecore_con.c (revision 51113)
+++ ecore_con.c (working copy)
@@ -43,6 +43,8 @@
#include "Ecore_Con.h"
#include "ecore_con_private.h"
+typedef struct _Ecore_Con_Thread_Data Ecore_Con_Thread_Data;
+
static void _ecore_con_cb_tcp_connect(void *data, Ecore_Con_Info *info);
static void _ecore_con_cb_udp_connect(void *data, Ecore_Con_Info *info);
static void _ecore_con_cb_tcp_listen(void *data, Ecore_Con_Info *info);
@@ -58,11 +60,25 @@
Ecore_Fd_Handler *fd_handler);
static Eina_Bool _ecore_con_svr_udp_handler(void *data,
Ecore_Fd_Handler *fd_handler);
+
+static void kill_server(Ecore_Con_Server *svr);
+static void _ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl);
+static void _ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl);
+static void _ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl);
+static void _ecore_con_svr_cancel(Ecore_Con_Client *cl);
+
+static void _ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void _ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void _ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr);
+
+
static Eina_Bool _ecore_con_svr_cl_handler(void *data,
Ecore_Fd_Handler *fd_handler);
-static void _ecore_con_server_flush(Ecore_Con_Server *svr);
-static void _ecore_con_client_flush(Ecore_Con_Client *cl);
+static void _ecore_con_server_flush_end(Ecore_Con_Server *svr);
+static void _ecore_con_client_flush_end(Ecore_Con_Client *cl);
+static void _ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr);
+static void _ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl);
static void _ecore_con_event_client_add_free(void *data, void *ev);
static void _ecore_con_event_client_del_free(void *data, void *ev);
@@ -84,6 +100,12 @@
static int _ecore_con_init_count = 0;
int _ecore_con_log_dom = -1;
+struct _Ecore_Con_Thread_Data
+{
+ void *data;
+ int type;
+};
+
/**
* @addtogroup Ecore_Con_Lib_Group Ecore Connection Library Functions
*
@@ -224,6 +246,8 @@
svr->clients = NULL;
svr->ppid = getpid();
ecore_con_ssl_server_prepare(svr);
+ if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS)
+ svr->threaded = EINA_TRUE;
type = compl_type & ECORE_CON_TYPE;
@@ -333,6 +357,8 @@
svr->clients = NULL;
svr->client_limit = -1;
ecore_con_ssl_server_prepare(svr);
+ if ((type & ECORE_CON_SSL) & ECORE_CON_USE_THREADS == ECORE_CON_USE_THREADS)
+ svr->threaded = EINA_TRUE;
type = compl_type & ECORE_CON_TYPE;
@@ -624,7 +650,8 @@
}
/**
- * Flushes all pending data to the given server. Will return when done.
+ * Flushes all pending data to the given server. Will return when done unless
+ * the server is in threaded mode, in which case it will return immediately.
*
* @param svr The given server.
*/
@@ -637,7 +664,14 @@
return;
}
- _ecore_con_server_flush(svr);
+ if (svr->threaded)
+ ecore_long_run(
+ (Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+ NULL,
+ (Ecore_Cb)_ecore_con_server_flush_end,
+ (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ else
+ _ecore_con_server_flush(NULL, svr);
}
/**
@@ -704,11 +738,17 @@
ecore_main_fd_handler_active_set(
cl->fd_handler, ECORE_FD_READ | ECORE_FD_WRITE);
- if(cl->server && ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP))
- sendto(cl->server->fd, data, size, 0, (struct sockaddr *)cl->client_addr,
- cl->client_addr_len);
- else if (cl->buf)
+ if(cl->server && (!cl->server->threaded) &&
+ ((cl->server->type & ECORE_CON_TYPE) == ECORE_CON_REMOTE_UDP))
{
+ if (!cl->server->threaded)
+ return (int)sendto(cl->server->fd, data, size, 0,
+ (struct sockaddr *)cl->client_addr,
+ cl->client_addr_len);
+ }
+
+ if (cl->buf)
+ {
unsigned char *newbuf;
newbuf = realloc(cl->buf, cl->buf_size + size);
@@ -870,7 +910,12 @@
return;
}
- _ecore_con_client_flush(cl);
+ if (cl->server->threaded)
+ ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+ (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+ (Ecore_Cb)_ecore_con_client_flush_end,
+ (Ecore_Cb)kill_server, cl->server, EINA_FALSE);
+ else _ecore_con_client_flush(NULL, cl);
}
/**
@@ -985,7 +1030,7 @@
t_start = ecore_time_get();
while ((svr->write_buf) && (!svr->dead))
{
- _ecore_con_server_flush(svr);
+ _ecore_con_server_flush(NULL, svr);
t = ecore_time_get();
if ((t - t_start) > 0.5)
{
@@ -1002,8 +1047,7 @@
EINA_LIST_FREE(svr->clients, cl)
_ecore_con_client_free(cl);
if ((svr->created) && (svr->path) && (svr->ppid == getpid()))
- unlink(
- svr->path);
+ unlink(svr->path);
if (svr->fd >= 0)
close(svr->fd);
@@ -1034,7 +1078,7 @@
t_start = ecore_time_get();
while ((cl->buf) && (!cl->dead))
{
- _ecore_con_client_flush(cl);
+ _ecore_con_client_flush(NULL, cl);
t = ecore_time_get();
if ((t - t_start) > 0.5)
{
@@ -1544,77 +1588,76 @@
return ECORE_CALLBACK_RENEW;
}
-static Eina_Bool
-_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+static void
+_ecore_con_cl_notify(Ecore_Thread *thread, Ecore_Con_Event_Server_Data *data, Ecore_Con_Server *svr)
{
- Ecore_Con_Server *svr;
+ ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, data,
+ _ecore_con_event_server_data_free,
+ NULL);
+}
- svr = data;
- if (svr->dead)
- return ECORE_CALLBACK_RENEW;
+static void
+_ecore_con_cl_tcp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
+ unsigned char *inbuf = NULL;
+ int inbuf_num = 0;
+ int tries;
- if (svr->delete_me)
- return ECORE_CALLBACK_RENEW;
+ for (tries = 0; tries < 16; tries++)
+ {
+ int num;
+ int lost_server = 1;
+ unsigned char buf[READBUFSIZ];
- if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
- {
- unsigned char *inbuf = NULL;
- int inbuf_num = 0;
- int tries;
+ if (!(svr->type & ECORE_CON_SSL))
+ {
+ if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0)
+ if ((num < 0) && (errno == EAGAIN))
+ lost_server = 0;
- if (svr->connecting &&
- (svr_try_connect(svr) !=
- ECORE_CON_CONNECTED))
- return ECORE_CALLBACK_RENEW;
+ }
+ else if (!(num =
+ ecore_con_ssl_server_read(svr, buf,
+ READBUFSIZ)))
+ lost_server = 0;
- for (tries = 0; tries < 16; tries++)
- {
- int num;
- int lost_server = 1;
- unsigned char buf[READBUFSIZ];
+ if (num < 1)
+ {
+ if (inbuf && !svr->delete_me)
+ {
+ Ecore_Con_Event_Server_Data *e;
- if (!(svr->type & ECORE_CON_SSL))
- {
- if ((num = read(svr->fd, buf, READBUFSIZ)) <= 0)
- if ((num < 0) && (errno == EAGAIN))
- lost_server = 0;
+ e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
+ if (e)
+ {
+ svr->event_count++;
+ e->server = svr;
+ e->data = inbuf;
+ e->size = inbuf_num;
- }
- else if (!(num =
- ecore_con_ssl_server_read(svr, buf,
- READBUFSIZ)))
- lost_server = 0;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
+ _ecore_con_event_server_data_free,
+ NULL);
+ else
+ ecore_thread_notify(thread, e);
+ }
+ }
- if (num < 1)
- {
- if (inbuf && !svr->delete_me)
- {
- Ecore_Con_Event_Server_Data *e;
+ if (lost_server)
+ {
+ if (!thread) kill_server(svr);
+ else ecore_thread_cancel(thread);
+ }
- e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
- if (e)
- {
- svr->event_count++;
- e->server = svr;
- e->data = inbuf;
- e->size = inbuf_num;
- ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
- _ecore_con_event_server_data_free,
- NULL);
- }
- }
+ break;
+ }
- if (lost_server)
- kill_server(svr);
+ inbuf = realloc(inbuf, inbuf_num + num);
+ memcpy(inbuf + inbuf_num, buf, num);
+ inbuf_num += num;
+ }
- break;
- }
-
- inbuf = realloc(inbuf, inbuf_num + num);
- memcpy(inbuf + inbuf_num, buf, num);
- inbuf_num += num;
- }
-
/* #if USE_OPENSSL */
/* if (svr->fd_handler) */
/* { */
@@ -1624,21 +1667,11 @@
/* ecore_main_fd_handler_active_set(svr->fd_handler, ECORE_FD_WRITE); */
/* } */
/* #endif */
- }
- else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE))
- {
- if (svr->connecting &&
- !svr_try_connect (svr))
- return ECORE_CALLBACK_RENEW;
- _ecore_con_server_flush(svr);
- }
-
- return ECORE_CALLBACK_RENEW;
}
static Eina_Bool
-_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
{
Ecore_Con_Server *svr;
@@ -1651,54 +1684,96 @@
if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
{
- unsigned char buf[65536];
- int num = 0;
+ if (svr->connecting &&
+ (svr_try_connect(svr) !=
+ ECORE_CON_CONNECTED))
+ return ECORE_CALLBACK_RENEW;
- errno = 0;
- num = read(svr->fd, buf, 65536);
- if (num > 0)
+ if (svr->threaded)
{
- if (!svr->delete_me)
- {
- Ecore_Con_Event_Server_Data *e;
- unsigned char *inbuf;
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_tcp_cb,
+ (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+ NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread;
+ }
+ else
+try_nothread:
+ _ecore_con_cl_tcp_cb(NULL, svr);
+ }
+ else if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_WRITE))
+ {
+ if (svr->connecting &&
+ !svr_try_connect (svr))
+ return ECORE_CALLBACK_RENEW;
- inbuf = malloc(num);
- if(inbuf == NULL)
- return 1;
+ if (svr->threaded)
+ {
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+ NULL,
+ (Ecore_Cb)_ecore_con_server_flush_end,
+ (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread_flush;
+ }
+ else
+try_nothread_flush:
+ _ecore_con_server_flush(NULL, svr);
+ }
- memcpy(inbuf, buf, num);
+ return ECORE_CALLBACK_RENEW;
+}
- e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
- if (e)
- {
- svr->event_count++;
- e->server = svr;
- e->data = inbuf;
- e->size = num;
- ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
- _ecore_con_event_server_data_free,
- NULL);
- }
+static void
+_ecore_con_cl_udp_cb(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
+ unsigned char buf[65536];
+ int num = 0;
+
+ errno = 0;
+ num = read(svr->fd, buf, 65536);
+ if (num > 0)
+ {
+ if (!svr->delete_me)
+ {
+ Ecore_Con_Event_Server_Data *e;
+ unsigned char *inbuf;
+
+ inbuf = malloc(num);
+ if(inbuf == NULL)
+ return;
+
+ memcpy(inbuf, buf, num);
+
+ e = calloc(1, sizeof(Ecore_Con_Event_Server_Data));
+ if (e)
+ {
+ svr->event_count++;
+ e->server = svr;
+ e->data = inbuf;
+ e->size = num;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_SERVER_DATA, e,
+ _ecore_con_event_server_data_free,
+ NULL);
+ else
+ ecore_thread_notify(thread, e);
}
}
- else if ((errno == EIO) || (errno == EBADF) ||
- (errno == EPIPE) || (errno == EINVAL) ||
- (errno == ENOSPC) || (errno == ECONNREFUSED))
- kill_server(svr);
}
- else if (ecore_main_fd_handler_active_get(fd_handler,
- ECORE_FD_WRITE))
- _ecore_con_server_flush(svr);
-
- return ECORE_CALLBACK_RENEW;
+ else if ((errno == EIO) || (errno == EBADF) ||
+ (errno == EPIPE) || (errno == EINVAL) ||
+ (errno == ENOSPC) || (errno == ECONNREFUSED))
+ {
+ if (!thread) kill_server(svr);
+ else ecore_thread_cancel(thread);
+ }
}
static Eina_Bool
-_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_cl_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
{
Ecore_Con_Server *svr;
- Ecore_Con_Client *cl = NULL;
svr = data;
if (svr->dead)
@@ -1709,236 +1784,482 @@
if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
{
- unsigned char buf[READBUFSIZ];
- unsigned char client_addr[256];
- unsigned int client_addr_len = sizeof(client_addr);
- int num;
+ if (svr->threaded)
+ {
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_cl_udp_cb,
+ (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+ NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread;
+ }
+ else
+ {
+try_nothread:
+ _ecore_con_cl_udp_cb(NULL, svr);
+ }
+ }
+ else if (ecore_main_fd_handler_active_get(fd_handler,
+ ECORE_FD_WRITE))
+ {
+ if (svr->threaded)
+ {
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_server_flush,
+ NULL,
+ (Ecore_Cb)_ecore_con_server_flush_end,
+ (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread_flush;
+ }
+ else
+try_nothread_flush:
+ _ecore_con_server_flush(NULL, svr);
+ }
- errno = 0;
+ return ECORE_CALLBACK_RENEW;
+}
+
+static void
+_ecore_con_svr_notify(Ecore_Thread *thread, Ecore_Con_Thread_Data *data, Ecore_Con_Client *cl)
+{
+ if (data->type == ECORE_CON_EVENT_CLIENT_ADD)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
+ data->data,
+ _ecore_con_event_client_add_free,
+ NULL);
+
+ else if (data->type == ECORE_CON_EVENT_CLIENT_DEL)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL,
+ data->data,
+ _ecore_con_event_client_del_free,
+ NULL);
+ else
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA,
+ data->data,
+ _ecore_con_event_client_data_free,
+ NULL);
+
+ free(data);
+}
+
+static void
+_ecore_con_svr_udp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
+ unsigned char buf[READBUFSIZ];
+ unsigned char client_addr[256];
+ unsigned int client_addr_len = sizeof(client_addr);
+ int num;
+ Ecore_Con_Server *svr = cl->server;
+
+ errno = 0;
#ifdef _WIN32
- num = fcntl(svr->fd, F_SETFL, O_NONBLOCK);
- if (num >= 0)
- num =
- recvfrom(svr->fd, buf, sizeof(buf), 0,
- (struct sockaddr *)&client_addr,
- &client_addr_len);
+ num = fcntl(svr->fd, F_SETFL, O_NONBLOCK);
+ if (num >= 0)
+ num = recvfrom(svr->fd, buf, sizeof(buf), 0,
+ (struct sockaddr *)&client_addr,
+ &client_addr_len);
#else
- num =
- recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT,
- (struct sockaddr *)&client_addr,
- &client_addr_len);
+ num = recvfrom(svr->fd, buf, sizeof(buf), MSG_DONTWAIT,
+ (struct sockaddr *)&client_addr,
+ &client_addr_len);
#endif
- if (num > 0)
+ if (num > 0)
+ {
+ if (!svr->delete_me)
{
- if (!svr->delete_me)
+ Ecore_Con_Event_Client_Data *e;
+ unsigned char *inbuf;
+
+ /* Create a new client for use in the client data event */
+ cl = calloc(1, sizeof(Ecore_Con_Client));
+ if(cl == NULL)
+ return;
+
+ cl->buf = NULL;
+ cl->fd = 0;
+ cl->fd_handler = NULL;
+ cl->server = svr;
+ cl->client_addr = calloc(1, client_addr_len);
+ cl->client_addr_len = client_addr_len;
+ if(cl->client_addr == NULL)
{
- Ecore_Con_Event_Client_Data *e;
- unsigned char *inbuf;
+ free(cl);
+ return;
+ }
- /* Create a new client for use in the client data event */
- cl = calloc(1, sizeof(Ecore_Con_Client));
- if(cl == NULL)
- return ECORE_CALLBACK_RENEW;
+ memcpy(cl->client_addr, &client_addr, client_addr_len);
+ ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT);
+ svr->clients = eina_list_append(svr->clients, cl);
- cl->buf = NULL;
- cl->fd = 0;
- cl->fd_handler = NULL;
- cl->server = svr;
- cl->client_addr = calloc(1, client_addr_len);
- cl->client_addr_len = client_addr_len;
- if(cl->client_addr == NULL)
- {
- free(cl);
- return ECORE_CALLBACK_RENEW;
- }
+ cl->ip = _ecore_con_pretty_ip(cl->client_addr,
+ cl->client_addr_len);
- memcpy(cl->client_addr, &client_addr, client_addr_len);
- ECORE_MAGIC_SET(cl, ECORE_MAGIC_CON_CLIENT);
- svr->clients = eina_list_append(svr->clients, cl);
+ inbuf = malloc(num);
+ if(inbuf == NULL)
+ {
+ free(cl->client_addr);
+ free(cl);
+ }
- cl->ip = _ecore_con_pretty_ip(cl->client_addr,
- cl->client_addr_len);
+ memcpy(inbuf, buf, num);
- inbuf = malloc(num);
- if(inbuf == NULL)
+ e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+ if (e)
+ {
+ svr->event_count++;
+ e->client = cl;
+ e->data = inbuf;
+ e->size = num;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
+ _ecore_con_event_client_data_free,
+ NULL);
+ else
{
- free(cl->client_addr);
- free(cl);
- return ECORE_CALLBACK_RENEW;
- }
+ Ecore_Con_Thread_Data *td;
- memcpy(inbuf, buf, num);
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+ {
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
+ }
- e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
- if (e)
- {
- svr->event_count++;
- e->client = cl;
- e->data = inbuf;
- e->size = num;
- ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
- _ecore_con_event_client_data_free,
- NULL);
+ td->type = ECORE_CON_EVENT_CLIENT_DATA;
+ td->data = e;
+ ecore_thread_notify(thread, td);
}
+ }
- if(!cl->delete_me)
+ if(!cl->delete_me)
+ {
+ Ecore_Con_Event_Client_Add *add;
+
+ add = calloc(1, sizeof(Ecore_Con_Event_Client_Add));
+ if(!add)
+ return;
+
+/*cl->event_count++;*/
+ add->client = cl;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
+ add,
+ _ecore_con_event_client_add_free,
+ NULL);
+ else
{
- Ecore_Con_Event_Client_Add *add;
+ Ecore_Con_Thread_Data *td;
- add = calloc(1, sizeof(Ecore_Con_Event_Client_Add));
- if(add)
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
{
-/*cl->event_count++;*/
- add->client = cl;
- ecore_event_add(ECORE_CON_EVENT_CLIENT_ADD,
- add,
- _ecore_con_event_client_add_free,
- NULL);
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
}
+
+ td->type = ECORE_CON_EVENT_CLIENT_ADD;
+ td->data = e;
+ ecore_thread_notify(thread, td);
}
+
}
}
- else if ((errno == EIO) || (errno == EBADF) ||
- (errno == EPIPE) || (errno == EINVAL) ||
- (errno == ENOSPC) || (errno == ECONNREFUSED))
+ }
+ else if ((errno == EIO) || (errno == EBADF) ||
+ (errno == EPIPE) || (errno == EINVAL) ||
+ (errno == ENOSPC) || (errno == ECONNREFUSED))
+ {
+ if (!svr->delete_me)
{
- if (!svr->delete_me)
+ /* we lost our client! */
+ Ecore_Con_Event_Client_Del *e;
+
+ e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
+ if (e)
{
- /* we lost our client! */
- Ecore_Con_Event_Client_Del *e;
+ svr->event_count++;
+ /* be explicit here */
+ e->client = NULL;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
+ _ecore_con_event_client_del_free,
+ NULL);
+ else
+ {
+ Ecore_Con_Thread_Data *td;
- e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
- if (e)
- {
- svr->event_count++;
- /* be explicit here */
- e->client = NULL;
- ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
- _ecore_con_event_client_del_free,
- NULL);
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+ {
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
+ }
+
+ td->type = ECORE_CON_EVENT_CLIENT_DEL;
+ td->data = e;
+ ecore_thread_notify(thread, td);
}
}
+ }
- svr->dead = 1;
- if (svr->fd_handler)
- ecore_main_fd_handler_del(svr->fd_handler);
+ svr->dead = 1;
+ if (thread)
+ {
+ ecore_thread_cancel(thread);
+ return;
+ }
+ if (svr->fd_handler)
+ ecore_main_fd_handler_del(svr->fd_handler);
- svr->fd_handler = NULL;
- }
+ svr->fd_handler = NULL;
}
- else if (ecore_main_fd_handler_active_get(fd_handler,
- ECORE_FD_WRITE))
- _ecore_con_client_flush(cl);
-
- return ECORE_CALLBACK_RENEW;
}
static Eina_Bool
-_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+_ecore_con_svr_udp_handler(void *data, Ecore_Fd_Handler *fd_handler)
{
- Ecore_Con_Client *cl;
+ Ecore_Con_Server *svr;
+ Ecore_Con_Client *cl = NULL;
- cl = data;
- if (cl->dead)
+ svr = data;
+ if (svr->dead)
return ECORE_CALLBACK_RENEW;
- if (cl->delete_me)
+ if (svr->delete_me)
return ECORE_CALLBACK_RENEW;
+
if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
{
- unsigned char *inbuf = NULL;
- int inbuf_num = 0;
- int tries;
+ if (svr->threaded)
+ {
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_udp_cb,
+ (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify,
+ NULL, (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread;
+ }
+ else
+ {
+try_nothread:
+ _ecore_con_svr_udp_cb(NULL, cl);
+ }
+ }
+ else if (ecore_main_fd_handler_active_get(fd_handler,
+ ECORE_FD_WRITE))
+ {
+ if (svr->threaded)
+ {
+ svr->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+ (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+ (Ecore_Cb)_ecore_con_client_flush_end,
+ (Ecore_Cb)kill_server, svr, EINA_FALSE);
+ if (!svr->recv_thread)
+ goto try_nothread_flush;
+ }
+ else
+try_nothread_flush:
+ _ecore_con_client_flush(NULL, cl);
+ }
+ return ECORE_CALLBACK_RENEW;
+}
- for (tries = 0; tries < 16; tries++)
+static void
+_ecore_con_svr_cancel(Ecore_Con_Client *cl)
+{
+ cl->dead = 1;
+ if (cl->fd_handler)
+ ecore_main_fd_handler_del(cl->fd_handler);
+
+ cl->fd_handler = NULL;
+}
+
+static void
+_ecore_con_svr_tcp_cb(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
+ unsigned char *inbuf = NULL;
+ int inbuf_num = 0;
+ int tries;
+ Ecore_Con_Server *svr = cl->server;
+
+ for (tries = 0; tries < 16; tries++)
+ {
+ int num;
+ int lost_client = 1;
+ unsigned char buf[READBUFSIZ];
+
+ errno = 0;
+
+ if (!(cl->server->type & ECORE_CON_SSL))
{
- int num;
- int lost_client = 1;
- unsigned char buf[READBUFSIZ];
+ if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0)
+ if ((num < 0) && (errno == EAGAIN))
+ lost_client = 0;
- errno = 0;
+ }
+ else if (!(num =
+ ecore_con_ssl_client_read(cl, buf,
+ READBUFSIZ)))
+ lost_client = 0;
- if (!(cl->server->type & ECORE_CON_SSL))
+ if (num < 1)
+ {
+ if (inbuf && !cl->delete_me)
{
- if ((num = read(cl->fd, buf, READBUFSIZ)) <= 0)
- if ((num < 0) && (errno == EAGAIN))
- lost_client = 0;
+ Ecore_Con_Event_Client_Data *e;
+ e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+ if (e)
+ {
+ cl->event_count++;
+ e->client = cl;
+ e->data = inbuf;
+ e->size = inbuf_num;
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
+ _ecore_con_event_client_data_free,
+ NULL);
+ else
+ {
+ Ecore_Con_Thread_Data *td;
+
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+ {
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
+ }
+
+ td->type = ECORE_CON_EVENT_CLIENT_DATA;
+ td->data = e;
+ ecore_thread_notify(thread, td);
+ }
+ }
}
- else if (!(num =
- ecore_con_ssl_client_read(cl, buf,
- READBUFSIZ)))
- lost_client = 0;
- if (num < 1)
+ if (lost_client)
{
- if (inbuf && !cl->delete_me)
+ if (!cl->delete_me)
{
- Ecore_Con_Event_Client_Data *e;
+ /* we lost our client! */
+ Ecore_Con_Event_Client_Del *e;
- e = calloc(1, sizeof(Ecore_Con_Event_Client_Data));
+ e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
if (e)
{
cl->event_count++;
e->client = cl;
- e->data = inbuf;
- e->size = inbuf_num;
- ecore_event_add(ECORE_CON_EVENT_CLIENT_DATA, e,
- _ecore_con_event_client_data_free,
- NULL);
- }
- }
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL,
+ e,
+ _ecore_con_event_client_del_free,
+ NULL);
+ else
+ {
+ Ecore_Con_Thread_Data *td;
- if (lost_client)
- {
- if (!cl->delete_me)
- {
- /* we lost our client! */
- Ecore_Con_Event_Client_Del *e;
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+ {
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
+ }
- e = calloc(1, sizeof(Ecore_Con_Event_Client_Del));
- if (e)
- {
- cl->event_count++;
- e->client = cl;
- ecore_event_add(
- ECORE_CON_EVENT_CLIENT_DEL,
- e,
- _ecore_con_event_client_del_free,
- NULL);
+ td->type = ECORE_CON_EVENT_CLIENT_DEL;
+ td->data = e;
+ ecore_thread_notify(thread, td);
}
}
+ }
+ if (thread)
+ ecore_thread_cancel(thread);
+ else
+ {
cl->dead = 1;
if (cl->fd_handler)
ecore_main_fd_handler_del(cl->fd_handler);
cl->fd_handler = NULL;
}
+ }
- break;
- }
- else
- {
- inbuf = realloc(inbuf, inbuf_num + num);
- memcpy(inbuf + inbuf_num, buf, num);
- inbuf_num += num;
- }
+ break;
}
+ else
+ {
+ inbuf = realloc(inbuf, inbuf_num + num);
+ memcpy(inbuf + inbuf_num, buf, num);
+ inbuf_num += num;
+ }
}
+}
+
+static Eina_Bool
+_ecore_con_svr_cl_handler(void *data, Ecore_Fd_Handler *fd_handler)
+{
+ Ecore_Con_Client *cl;
+
+ cl = data;
+ if (cl->dead)
+ return ECORE_CALLBACK_RENEW;
+
+ if (cl->delete_me)
+ return ECORE_CALLBACK_RENEW;
+
+ if (ecore_main_fd_handler_active_get(fd_handler, ECORE_FD_READ))
+ {
+ if (cl->server->threaded)
+ {
+ cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_svr_tcp_cb,
+ (Ecore_Thread_Notify_Cb)_ecore_con_svr_notify,
+ NULL,
+ (Ecore_Cb)_ecore_con_svr_cancel,
+ cl, EINA_FALSE);
+ if (!cl->server->recv_thread)
+ goto try_nothread;
+ }
+ else
+ {
+try_nothread:
+ _ecore_con_svr_udp_cb(NULL, cl);
+ }
+ }
else if (ecore_main_fd_handler_active_get(fd_handler,
ECORE_FD_WRITE))
- _ecore_con_client_flush(cl);
+ {
+ if (cl->server->threaded)
+ {
+ cl->server->recv_thread = ecore_long_run((Ecore_Thread_Heavy_Cb)_ecore_con_client_flush,
+ (Ecore_Thread_Notify_Cb)_ecore_con_cl_notify,
+ (Ecore_Cb)_ecore_con_client_flush_end,
+ (Ecore_Cb)_ecore_con_svr_cancel, cl,
+ EINA_FALSE);
+ if (!cl->server->recv_thread)
+ goto try_nothread_flush;
+ }
+ else
+try_nothread_flush:
+ _ecore_con_client_flush(NULL, cl);
+ }
return ECORE_CALLBACK_RENEW;
}
static void
-_ecore_con_server_flush(Ecore_Con_Server *svr)
+_ecore_con_server_flush_end(Ecore_Con_Server *svr)
{
+ if (svr->write_buf_offset < svr->write_buf_size)
+ return;
+
+ svr->write_buf_size = 0;
+ svr->write_buf_offset = 0;
+ free(svr->write_buf);
+ svr->write_buf = NULL;
+ if (svr->fd_handler)
+ ecore_main_fd_handler_active_set(svr->fd_handler,
+ ECORE_FD_READ);
+}
+
+static void
+_ecore_con_server_flush(Ecore_Thread *thread, Ecore_Con_Server *svr)
+{
int count, num;
if (!svr->write_buf)
@@ -1963,11 +2284,18 @@
if (count < 1)
{
/* we lost our server! */
- kill_server(svr);
+ if (!thread)
+ return kill_server(svr);
+
+ ecore_thread_cancel(thread);
return;
}
svr->write_buf_offset += count;
+
+ if (thread)
+ return;
+
if (svr->write_buf_offset >= svr->write_buf_size)
{
svr->write_buf_size = 0;
@@ -1981,8 +2309,22 @@
}
static void
-_ecore_con_client_flush(Ecore_Con_Client *cl)
+_ecore_con_client_flush_end(Ecore_Con_Client *cl)
{
+ if (cl->buf_offset < cl->buf_size)
+ return;
+
+ cl->buf_size = 0;
+ cl->buf_offset = 0;
+ free(cl->buf);
+ cl->buf = NULL;
+ if (cl->fd_handler)
+ ecore_main_fd_handler_active_set(cl->fd_handler, ECORE_FD_READ);
+}
+
+static void
+_ecore_con_client_flush(Ecore_Thread *thread, Ecore_Con_Client *cl)
+{
int count, num;
if (!cl->buf)
@@ -2009,11 +2351,32 @@
{
cl->event_count++;
e->client = cl;
- ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
- _ecore_con_event_client_del_free, NULL);
+ if (!thread)
+ ecore_event_add(ECORE_CON_EVENT_CLIENT_DEL, e,
+ _ecore_con_event_client_del_free, NULL);
+ else
+ {
+ Ecore_Con_Thread_Data *td;
+
+ if (!(td = malloc(sizeof(Ecore_Con_Thread_Data))))
+ {
+ WRN("Allocation failure in ecore_con threaded server data!");
+ return;
+ }
+
+ td->type = ECORE_CON_EVENT_CLIENT_DEL;
+ td->data = e;
+ ecore_thread_notify(thread, td);
+ }
}
cl->dead = 1;
+ if (thread)
+ {
+ ecore_thread_cancel(thread);
+ return;
+ }
+
if (cl->fd_handler)
ecore_main_fd_handler_del(cl->fd_handler);
@@ -2024,6 +2387,8 @@
}
cl->buf_offset += count;
+ if (thread)
+ return;
if (cl->buf_offset >= cl->buf_size)
{
cl->buf_size = 0;
------------------------------------------------------------------------------
This SF.net email is sponsored by
Make an app they can't live without
Enter the BlackBerry Developer Challenge
http://p.sf.net/sfu/RIM-dev2dev
_______________________________________________
enlightenment-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/enlightenment-devel