Hi Willy,

Great ! I thinked this patch for the trash. You can merge this work. The main goal is providing an environment to process spoe with lua or python.

Thierry

Le 11 mai 2019 11:43:14 Willy Tarreau <w...@1wt.eu> a écrit :

Hi Thierry,

I just stumbled upon the patch series below you sent a while ago. I see
that you didn't receive any feedback on it, but see no reason not to
merge it, as it must still be valid given that it's outside of the
core. Do you have any objection against it getting merged ? Or maybe
even a newer version ? This could be a nice bootstrap for people who
want to try to create new agents.

Thanks,
Willy

On Sun, Feb 25, 2018 at 10:00:01PM +0100, Thierry Fournier wrote:
Hi,

Some guy says that SPOE is a great method for some things. Actually it
is not really accessible because it requires C development.

I write a server which can bind SPOP messages on Python and/Lua
functions. The function process the necessary and return variables
using SPOP ack.

The patches are in attachment. Below, an example of python script.

Thierry


   import spoa
   import ipaddress

   def check_client_ip(args):

        pprint(args)
        # This display:
        # [{'name': '', 'value': True},
        #  {'name': '', 'value': 1234L},
        #  {'name': '', 'value': IPv4Address(u'127.0.0.1')},
        #  {'name': '', 'value': IPv6Address(u'::55')},
        #  {'name': '', 'value': '127.0.0.1:10001'}]

        spoa.set_var_null("null", spoa.scope_txn)
        spoa.set_var_boolean("boolean", spoa.scope_txn, True)
        spoa.set_var_int32("int32", spoa.scope_txn, 1234)
        spoa.set_var_uint32("uint32", spoa.scope_txn, 1234)
        spoa.set_var_int64("int64", spoa.scope_txn, 1234)
        spoa.set_var_uint64("uint64", spoa.scope_txn, 1234)
        spoa.set_var_ipv4("ipv4", spoa.scope_txn, 
ipaddress.IPv4Address(u"127.0.0.1"))
        spoa.set_var_ipv6("ipv6", spoa.scope_txn, 
ipaddress.IPv6Address(u"1::f"))
        spoa.set_var_str("str", spoa.scope_txn, "1::f")
        spoa.set_var_bin("bin", spoa.scope_txn, "1:\x01:\x02f\x00\x00")
        # HAProxy display:
        # [debug converter] type: any <>
        # [debug converter] type: bool <1>
        # [debug converter] type: sint <1234>
        # [debug converter] type: sint <1234>
        # [debug converter] type: sint <1234>
        # [debug converter] type: sint <1234>
        # [debug converter] type: ipv4 <127.0.0.1>
        # [debug converter] type: ipv6 <1::f>
        # [debug converter] type: str <1::f>
        # [debug converter] type: bin <1:.:.f>

        return



>From 0794044c73b7361560ebeb205d733f978bcd78af Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 11:40:03 +0100
Subject: [PATCH 01/14] MINOR: spoa-server: Clone the v1.7 spoa-example project

This is a working base.
---
 contrib/spoa_server/Makefile |   24 +
 contrib/spoa_server/README   |   88 ++++
 contrib/spoa_server/spoa.c   | 1152 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 1264 insertions(+)
 create mode 100644 contrib/spoa_server/Makefile
 create mode 100644 contrib/spoa_server/README
 create mode 100644 contrib/spoa_server/spoa.c

diff --git a/contrib/spoa_server/Makefile b/contrib/spoa_server/Makefile
new file mode 100644
index 0000000..e6b7c53
--- /dev/null
+++ b/contrib/spoa_server/Makefile
@@ -0,0 +1,24 @@
+DESTDIR =
+PREFIX  = /usr/local
+BINDIR  = $(PREFIX)/bin
+
+CC = gcc
+LD = $(CC)
+
+CFLAGS  = -g -O2 -Wall -Werror -pthread
+LDFLAGS = -lpthread
+
+OBJS = spoa.o
+
+
+spoa: $(OBJS)
+       $(LD) $(LDFLAGS) -o $@ $^
+
+install: spoa
+       install spoa $(DESTDIR)$(BINDIR)
+
+clean:
+       rm -f spoa $(OBJS)
+
+%.o:   %.c
+       $(CC) $(CFLAGS) -c -o $@ $<
diff --git a/contrib/spoa_server/README b/contrib/spoa_server/README
new file mode 100644
index 0000000..7e376ee
--- /dev/null
+++ b/contrib/spoa_server/README
@@ -0,0 +1,88 @@
+A Random IP reputation service acting as a Stream Processing Offload Agent
+--------------------------------------------------------------------------
+
+This is a very simple service that implement a "random" ip reputation
+service. It will return random scores for all checked IP addresses. It only
+shows you how to implement a ip reputation service or such kind of services
+using the SPOE.
+
+
+  Start the service
+---------------------
+
+After you have compiled it, to start the service, you just need to use "spoa"
+binary:
+
+    $> ./spoa  -h
+    Usage: ./spoa [-h] [-d] [-p <port>] [-n <num-workers>]
+        -h                  Print this message
+        -d                  Enable the debug mode
+        -p <port>           Specify the port to listen on (default: 12345)
+        -n <num-workers>    Specify the number of workers (default: 5)
+
+Note: A worker is a thread.
+
+
+  Configure a SPOE to use the service
+---------------------------------------
+
+All information about SPOE configuration can be found in "doc/SPOE.txt". Here is
+the configuration template to use for your SPOE:
+
+    [ip-reputation]
+
+    spoe-agent iprep-agent
+        messages check-client-ip
+
+        option var-prefix iprep
+
+        timeout hello      100ms
+        timeout idle       30s
+        timeout processing 15ms
+
+        use-backend iprep-backend
+
+    spoe-message check-client-ip
+        args src
+        event on-client-session
+
+
+The engine is in the scope "ip-reputation". So to enable it, you must set the
+following line in a frontend/listener section:
+
+    frontend my-front
+        ...
+        filter spoe engine ip-reputation config /path/spoe-ip-reputation.conf
+       ....
+
+where "/path/spoe-ip-reputation.conf" is the path to your SPOE configuration
+file. The engine name is important here, it must be the same than the one used
+in the SPOE configuration file.
+
+IMPORTANT NOTE:
+    Because we want to send a message on the "on-client-session" event, this
+    SPOE must be attached to a proxy with the frontend capability. If it is
+    declared in a backend section, it will have no effet.
+
+
+Because, in SPOE configuration file, we declare to use the backend
+"iprep-backend" to communicate with the service, you must define it in HAProxy
+configuration. For example:
+
+    backend iprep-backend
+        mode tcp
+       timeout server 1m
+       server iprep-srv 127.0.0.1:12345 check maxconn 5
+
+
+In reply to the "check-client-ip" message, this service will set the variable
+"ip_score" for the session, an integer between 0 and 100. If unchanged, the
+variable prefix is "iprep". So the full variable name will be
+"sess.iprep.ip_score".
+
+You can use it in ACLs to experiment the SPOE feature. For example:
+
+    tcp-request content reject if { var(sess.iprep.ip_score) -m int lt 20 }
+
+With this rule, all IP address with a score lower than 20 will be rejected
+(Remember, this score is random).
diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
new file mode 100644
index 0000000..ce59c04
--- /dev/null
+++ b/contrib/spoa_server/spoa.c
@@ -0,0 +1,1152 @@
+/*
+ * A Random IP reputation service acting as a Stream Processing Offload Agent
+ *
+ * This is a very simple service that implement a "random" ip reputation
+ * service. It will return random scores for all checked IP addresses. It only
+ * shows you how to implement a ip reputation service or such kind of services
+ * using the SPOE.
+ *
+ * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfau...@haproxy.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include <signal.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+
+#define DEFAULT_PORT      12345
+#define NUM_WORKERS       5
+#define MAX_FRAME_SIZE    16384
+#define SPOP_VERSION      "1.0"
+#define SPOA_CAPABILITIES ""
+
+#define SLEN(str) (sizeof(str)-1)
+
+#define LOG(fmt, args...)                                           \
+       do {                                                        \
+               struct timeval now;                                 \
+               int wid = *((int*)pthread_getspecific(worker_id));  \
+                                                                   \
+               gettimeofday(&now, NULL);                       \
+               fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n",           \
+                       now.tv_sec, now.tv_usec, wid, ##args);      \
+       } while (0)
+
+#define DEBUG(x...)                             \
+       do {                                    \
+               if (debug)                      \
+                       LOG(x);                 \
+       } while (0)
+
+/* Frame Types sent by HAProxy and by agents */
+enum spoe_frame_type {
+       /* Frames sent by HAProxy */
+       SPOE_FRM_T_HAPROXY_HELLO = 1,
+       SPOE_FRM_T_HAPROXY_DISCON,
+       SPOE_FRM_T_HAPROXY_NOTIFY,
+
+       /* Frames sent by the agents */
+       SPOE_FRM_T_AGENT_HELLO = 101,
+       SPOE_FRM_T_AGENT_DISCON,
+       SPOE_FRM_T_AGENT_ACK
+};
+
+/* All supported data types */
+enum spoe_data_type {
+       SPOE_DATA_T_NULL = 0,
+       SPOE_DATA_T_BOOL,
+       SPOE_DATA_T_INT32,
+       SPOE_DATA_T_UINT32,
+       SPOE_DATA_T_INT64,
+       SPOE_DATA_T_UINT64,
+       SPOE_DATA_T_IPV4,
+       SPOE_DATA_T_IPV6,
+       SPOE_DATA_T_STR,
+       SPOE_DATA_T_BIN,
+       SPOE_DATA_TYPES
+};
+
+/* Errors triggerd by SPOE applet */
+enum spoe_frame_error {
+       SPOE_FRM_ERR_NONE = 0,
+       SPOE_FRM_ERR_IO,
+       SPOE_FRM_ERR_TOUT,
+       SPOE_FRM_ERR_TOO_BIG,
+       SPOE_FRM_ERR_INVALID,
+       SPOE_FRM_ERR_NO_VSN,
+       SPOE_FRM_ERR_NO_FRAME_SIZE,
+       SPOE_FRM_ERR_NO_CAP,
+       SPOE_FRM_ERR_BAD_VSN,
+       SPOE_FRM_ERR_BAD_FRAME_SIZE,
+       SPOE_FRM_ERR_UNKNOWN = 99,
+       SPOE_FRM_ERRS,
+};
+
+/* All supported SPOE actions */
+enum spoe_action_type {
+       SPOE_ACT_T_SET_VAR = 1,
+       SPOE_ACT_T_UNSET_VAR,
+       SPOE_ACT_TYPES,
+};
+
+/* Scopes used for variables set by agents. It is a way to be agnotic to vars
+ * scope. */
+enum spoe_vars_scope {
+       SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC  */
+       SPOE_SCOPE_SESS,     /* <=> SCOPE_SESS */
+       SPOE_SCOPE_TXN,      /* <=> SCOPE_TXN  */
+       SPOE_SCOPE_REQ,      /* <=> SCOPE_REQ  */
+       SPOE_SCOPE_RES,      /* <=> SCOPE_RES  */
+};
+
+
+/* Masks to get data type or flags value */
+#define SPOE_DATA_T_MASK  0x0F
+#define SPOE_DATA_FL_MASK 0xF0
+
+/* Flags to set Boolean values */
+#define SPOE_DATA_FL_FALSE 0x00
+#define SPOE_DATA_FL_TRUE  0x10
+static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
+       [SPOE_FRM_ERR_NONE]           = "normal",
+       [SPOE_FRM_ERR_IO]             = "I/O error",
+       [SPOE_FRM_ERR_TOUT]           = "a timeout occurred",
+       [SPOE_FRM_ERR_TOO_BIG]        = "frame is too big",
+       [SPOE_FRM_ERR_INVALID]        = "invalid frame received",
+       [SPOE_FRM_ERR_NO_VSN]         = "version value not found",
+       [SPOE_FRM_ERR_NO_FRAME_SIZE]  = "max-frame-size value not found",
+       [SPOE_FRM_ERR_NO_CAP]         = "capabilities value not found",
+       [SPOE_FRM_ERR_BAD_VSN]        = "unsupported version",
+       [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
+       [SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
+};
+
+struct worker {
+       unsigned int id;
+       char         buf[MAX_FRAME_SIZE];
+       unsigned int len;
+       unsigned int size;
+       int          status_code;
+       unsigned int stream_id;
+       unsigned int frame_id;
+       bool         healthcheck;
+       int          ip_score; /* -1 if unset, else between 0 and 100 */
+};
+
+struct chunk {
+       char *str;      /* beginning of the string itself. Might not be 
0-terminated */
+       int len;        /* current size of the string from first to last char */
+};
+
+union spoe_value {
+       bool            boolean; /* use for boolean */
+       int32_t         sint32;  /* used for signed 32bits integers */
+       uint32_t        uint32;  /* used for signed 32bits integers */
+       int32_t         sint64;  /* used for signed 64bits integers */
+       uint32_t        uint64;  /* used for signed 64bits integers */
+       struct in_addr  ipv4;    /* used for ipv4 addresses */
+       struct in6_addr ipv6;    /* used for ipv6 addresses */
+       struct chunk    buffer;  /* used for char strings or buffers */
+};
+
+/* Used to store sample constant */
+struct spoe_data {
+       enum spoe_data_type type;  /* SPOE_DATA_T_* */
+       union spoe_value    u;     /* spoe data value */
+};
+
+static bool debug = false;
+static pthread_key_t worker_id;
+
+static void
+check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
+{
+       char str[INET_ADDRSTRLEN];
+
+       if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
+               return;
+
+       w->ip_score = random() % 100;
+
+       DEBUG("  IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
+}
+
+static void
+check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
+{
+       char str[INET6_ADDRSTRLEN];
+
+       if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
+               return;
+
+       w->ip_score = random() % 100;
+
+       DEBUG("  IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
+}
+
+static int
+do_read(int sock, void *buf, int read_len)
+{
+       fd_set readfds;
+       int    n = 0, total = 0, bytesleft = read_len;
+
+       FD_ZERO(&readfds);
+       FD_SET(sock, &readfds);
+
+       while (total < read_len) {
+               if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
+                       return -1;
+               if (!FD_ISSET(sock, &readfds))
+                       return -1;
+
+               n = read(sock, buf + total, bytesleft);
+               if (n <= 0)
+                       break;
+
+               total += n;
+               bytesleft -= n;
+       }
+
+       return (n == -1) ? -1 : total;
+}
+
+static int
+do_write(int sock, void *buf, int write_len)
+{
+       fd_set writefds;
+       int    n = 0, total = 0, bytesleft = write_len;
+
+       FD_ZERO(&writefds);
+       FD_SET(sock, &writefds);
+
+       while (total < write_len) {
+               if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
+                       return -1;
+               if (!FD_ISSET(sock, &writefds))
+                       return -1;
+
+               n = write(sock, buf + total, bytesleft);
+               if (n <= 0)
+                       break;
+
+               total += n;
+               bytesleft -= n;
+       }
+
+       return (n == -1) ? -1 : total;
+}
+
+/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
+ * otherwise the number of read bytes.*/
+static int
+read_frame(int sock, struct worker *w)
+{
+       uint32_t     netint;
+       unsigned int framesz;
+
+       /* Read the frame size, on 4 bytes */
+       if (do_read(sock, &netint, sizeof(netint)) != 4) {
+               w->status_code = SPOE_FRM_ERR_IO;
+               return -1;
+       }
+
+       /* Check it against the max size */
+       framesz = ntohl(netint);
+       if (framesz > w->size) {
+               w->status_code = SPOE_FRM_ERR_TOO_BIG;
+               return -1;
+       }
+
+       /* Read the frame */
+       if (do_read(sock, w->buf, framesz) != framesz) {
+               w->status_code = SPOE_FRM_ERR_IO;
+               return -1;
+       }
+
+       w->len = framesz;
+       return framesz;
+}
+
+/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
+ * number of written bytes. */
+static int
+write_frame(int sock, struct worker *w)
+{
+       uint32_t netint;
+
+       /* Write the frame size, on 4 bytes */
+       netint = htonl(w->len);
+       if (do_write(sock, &netint, sizeof(netint)) != 4) {
+               w->status_code = SPOE_FRM_ERR_IO;
+               return -1;
+       }
+
+       /* Write the frame */
+       if (do_write(sock, w->buf, w->len) != w->len) {
+               w->status_code = SPOE_FRM_ERR_IO;
+               return -1;
+       }
+       return w->len;
+}
+
+/* Encode a variable-length integer. This function never fails and returns the
+ * number of written bytes. */
+static int
+encode_spoe_varint(uint64_t i, char *buf)
+{
+       int idx;
+
+       if (i < 240) {
+               buf[0] = (unsigned char)i;
+               return 1;
+       }
+
+       buf[0] = (unsigned char)i | 240;
+       i = (i - 240) >> 4;
+       for (idx = 1; i >= 128; ++idx) {
+               buf[idx] = (unsigned char)i | 128;
+               i = (i - 128) >> 7;
+       }
+       buf[idx++] = (unsigned char)i;
+       return idx;
+}
+
+/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
+ * happens when the buffer's end in reached. On success, the number of read
+ * bytes is returned. */
+static int
+decode_spoe_varint(char *buf, char *end, uint64_t *i)
+{
+       unsigned char *msg = (unsigned char *)buf;
+       int            idx = 0;
+
+       if (msg > (unsigned char *)end)
+               return -1;
+
+       if (msg[0] < 240) {
+               *i = msg[0];
+               return 1;
+       }
+       *i = msg[0];
+       do {
+               ++idx;
+               if (msg+idx > (unsigned char *)end)
+                       return -1;
+               *i += (uint64_t)msg[idx] <<  (4 + 7 * (idx-1));
+       } while (msg[idx] >= 128);
+       return (idx + 1);
+}
+
+/* Encode a string. The string will be prefix by its length, encoded as a
+ * variable-length integer. This function never fails and returns the number of
+ * written bytes. */
+static int
+encode_spoe_string(const char *str, size_t len, char *dst)
+{
+       int idx = 0;
+
+       if (!len) {
+               dst[0] = 0;
+               return 1;
+       }
+
+       idx += encode_spoe_varint(len, dst);
+       memcpy(dst+idx, str, len);
+       return (idx + len);
+}
+
+/* Decode a string. Its length is decoded first as a variable-length integer. If
+ * it succeeds, and if the string length is valid, the begin of the string is
+ * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
+ * read is returned. If an error occurred, -1 is returned and <*str> remains
+ * NULL. */
+static int
+decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
+{
+       int r, idx = 0;
+
+       *str = NULL;
+       *len = 0;
+
+       if ((r = decode_spoe_varint(buf, end, len)) == -1)
+               goto error;
+       idx += r;
+       if (buf + idx + *len > end)
+               goto error;
+
+       *str = buf+idx;
+       return (idx + *len);
+
+error:
+       return -1;
+}
+
+/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
+ * of bytes read is returned. A types data is composed of a type (1 byte) and
+ * corresponding data:
+ *  - boolean: non additional data (0 bytes)
+ *  - integers: a variable-length integer (see decode_spoe_varint)
+ *  - ipv4: 4 bytes
+ *  - ipv6: 16 bytes
+ *  - binary and string: a buffer prefixed by its size, a variable-length
+ *    integer (see decode_spoe_string) */
+static int
+skip_spoe_data(char *frame, char *end)
+{
+       uint64_t sz = 0;
+       int      r, idx = 0;
+
+       if (frame > end)
+               return -1;
+
+       switch (frame[idx++] & SPOE_DATA_T_MASK) {
+               case SPOE_DATA_T_BOOL:
+                       idx++;
+                       break;
+               case SPOE_DATA_T_INT32:
+               case SPOE_DATA_T_INT64:
+               case SPOE_DATA_T_UINT32:
+               case SPOE_DATA_T_UINT64:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       idx += r;
+                       break;
+               case SPOE_DATA_T_IPV4:
+                       idx += 4;
+                       break;
+               case SPOE_DATA_T_IPV6:
+                       idx += 16;
+                       break;
+               case SPOE_DATA_T_STR:
+               case SPOE_DATA_T_BIN:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       idx += r + sz;
+                       break;
+       }
+
+       if (frame+idx > end)
+               return -1;
+       return idx;
+}
+
+/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
+ * number of read bytes is returned. See skip_spoe_data for details. */
+static int
+decode_spoe_data(char *frame, char *end, struct spoe_data *data)
+{
+       uint64_t sz  = 0;
+       int      type, r, idx = 0;
+
+       if (frame > end)
+               return -1;
+
+       type = frame[idx++];
+       data->type = (type & SPOE_DATA_T_MASK);
+       switch (data->type) {
+               case SPOE_DATA_T_BOOL:
+                       data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == 
SPOE_DATA_FL_TRUE);
+                       break;
+               case SPOE_DATA_T_INT32:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       data->u.sint32 = sz;
+                       idx += r;
+                       break;
+               case SPOE_DATA_T_INT64:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       data->u.uint32 = sz;
+                       idx += r;
+                       break;
+               case SPOE_DATA_T_UINT32:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       data->u.sint64 = sz;
+                       idx += r;
+                       break;
+               case SPOE_DATA_T_UINT64:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       data->u.uint64 = sz;
+                       idx += r;
+                       break;
+               case SPOE_DATA_T_IPV4:
+                       if (frame+idx+4 > end)
+                               return -1;
+                       memcpy(&data->u.ipv4, frame+idx, 4);
+                       idx += 4;
+                       break;
+               case SPOE_DATA_T_IPV6:
+                       if (frame+idx+16 > end)
+                               return -1;
+                       memcpy(&data->u.ipv6, frame+idx, 16);
+                       idx += 16;
+                       break;
+               case SPOE_DATA_T_STR:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       idx += r;
+                       if (frame+idx+sz > end)
+                               return -1;
+                       data->u.buffer.str = frame+idx;
+                       data->u.buffer.len = sz;
+                       idx += sz;
+                       break;
+               case SPOE_DATA_T_BIN:
+                       if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+                               return -1;
+                       idx += r;
+                       if (frame+idx+sz > end)
+                               return -1;
+                       data->u.buffer.str = frame+idx;
+                       data->u.buffer.len = sz;
+                       idx += sz;
+                       break;
+               default:
+                       break;
+       }
+
+       if (frame+idx > end)
+               return -1;
+       return idx;
+}
+
+
+/* Check the protocol version. It returns -1 if an error occurred, the number of
+ * read bytes otherwise. */
+static int
+check_proto_version(struct worker *w, int idx)
+{
+       char    *str;
+       uint64_t sz;
+
+       /* Get the list of all supported versions by HAProxy */
+       if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               return -1;
+       }
+       idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
+       if (str == NULL) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               return -1;
+       }
+
+       /* TODO: Find the right verion in supported ones */
+
+       return idx;
+}
+
+/* Check max frame size value. It returns -1 if an error occurred, the number of
+ * read bytes otherwise. */
+static int
+check_max_frame_size(struct worker *w, int idx)
+{
+       uint64_t sz;
+       int      type, i;
+
+       /* Get the max-frame-size value of HAProxy */
+       type =  w->buf[idx++];
+       if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32  &&
+           (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64  &&
+           (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
+           (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               return -1;
+       }
+       if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               return -1;
+       }
+       idx += i;
+
+       /* Keep the lower value */
+       if (sz < w->size)
+               w->size = sz;
+
+       return idx;
+}
+
+/* Check healthcheck value. It returns -1 if an error occurred, the number of
+ * read bytes otherwise. */
+static int
+check_healthcheck(struct worker *w, int idx)
+{
+       int type;
+
+       /* Get the "healthcheck" value of HAProxy */
+       type = w->buf[idx++];
+       if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               return -1;
+       }
+       w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
+       return idx;
+}
+
+
+/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
+ * occurred, 0 if the frame must be skipped, otherwise the number of read
+ * bytes. */
+static int
+handle_hahello(struct worker *w)
+{
+       char        *end = w->buf+w->len;
+       int          i, idx = 0;
+
+       /* Check frame type */
+       if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
+               goto skip;
+
+       /* Skip flags */
+       idx += 4;
+
+       /* stream-id and frame-id must be cleared */
+       if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               goto error;
+       }
+       idx += 2;
+
+       /* Loop on K/V items */
+       while (idx < w->len) {
+               char     *str;
+               uint64_t  sz;
+
+               /* Decode the item name */
+               idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
+               if (str == NULL) {
+                       w->status_code = SPOE_FRM_ERR_INVALID;
+                       goto error;
+               }
+
+               /* Check "supported-versions" K/V item */
+               if (!memcmp(str, "supported-versions", sz)) {
+                       if ((i = check_proto_version(w, idx)) == -1)
+                               goto error;
+                       idx = i;
+               }
+               /* Check "max-frame-size" K/V item "*/
+               else if (!memcmp(str, "max-frame-size", sz)) {
+                       if ((i = check_max_frame_size(w, idx)) == -1)
+                               goto error;
+                       idx = i;
+               }
+               /* Check "healthcheck" K/V item "*/
+               else if (!memcmp(str, "healthcheck", sz)) {
+                       if ((i = check_healthcheck(w, idx)) == -1)
+                               goto error;
+                       idx = i;
+               }
+               /* Skip "capabilities" K/V item for now */
+               else {
+                       /* Silently ignore unknown item */
+                       if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
+                               w->status_code = SPOE_FRM_ERR_INVALID;
+                               goto error;
+                       }
+                       idx += i;
+               }
+       }
+
+       return idx;
+skip:
+       return 0;
+error:
+       return -1;
+}
+
+/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
+ * occurred, 0 if the frame must be skipped, otherwise the number of read
+ * bytes. */
+static int
+handle_hadiscon(struct worker *w)
+{
+       char        *end = w->buf+w->len;
+       int          i, idx = 0;
+
+       /* Check frame type */
+       if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
+               goto skip;
+
+       /* Skip flags */
+       idx += 4;
+
+       /* stream-id and frame-id must be cleared */
+       if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               goto error;
+       }
+       idx += 2;
+
+       /* Loop on K/V items */
+       while (idx < w->len) {
+               char     *str;
+               uint64_t  sz;
+
+               /* Decode item key */
+               idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
+               if (str == NULL) {
+                       w->status_code = SPOE_FRM_ERR_INVALID;
+                       goto error;
+               }
+               /* Silently ignore unknown item */
+               if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
+                       w->status_code = SPOE_FRM_ERR_INVALID;
+                       goto error;
+               }
+               idx += i;
+       }
+
+       w->status_code = SPOE_FRM_ERR_NONE;
+       return idx;
+skip:
+       return 0;
+error:
+       return -1;
+}
+
+/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
+ * occurred, 0 if the frame must be skipped, otherwise the number of read
+ * bytes. */
+static int
+handle_hanotify(struct worker *w)
+{
+       char    *end = w->buf+w->len;
+       uint64_t stream_id, frame_id;
+       int      nbargs, i, idx = 0;
+
+       /* Check frame type */
+       if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
+               goto skip;
+
+       /* Skip flags */
+       idx += 4;
+
+       /* Read the stream-id */
+       if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               goto error;
+       }
+       idx += i;
+
+       /* Read the frame-id */
+       if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
+               w->status_code = SPOE_FRM_ERR_INVALID;
+               goto error;
+       }
+       idx += i;
+
+       w->stream_id = (unsigned int)stream_id;
+       w->frame_id  = (unsigned int)frame_id;
+
+       DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
+             w->stream_id, w->frame_id);
+
+       /* Loop on messages */
+       while (idx < w->len) {
+               char    *str;
+               uint64_t sz;
+
+               /* Decode the message name */
+               idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
+               if (str == NULL) {
+                       w->status_code = SPOE_FRM_ERR_INVALID;
+                       goto error;
+               }
+               DEBUG("  Message '%.*s' received", (int)sz, str);
+
+               nbargs = w->buf[idx++];
+               if (!memcmp(str, "check-client-ip", sz)) {
+                       struct spoe_data data;
+
+                       memset(&data, 0, sizeof(data));
+
+                       if (nbargs != 1) {
+                               w->status_code = SPOE_FRM_ERR_INVALID;
+                               goto error;
+                       }
+                       if ((i = decode_spoe_string(w->buf+idx, end, &str, 
&sz)) == -1) {
+                               w->status_code = SPOE_FRM_ERR_INVALID;
+                               goto error;
+                       }
+                       idx += i;
+                       if ((i = decode_spoe_data(w->buf+idx, end, &data)) == 
-1) {
+                               w->status_code = SPOE_FRM_ERR_INVALID;
+                               goto error;
+                       }
+                       idx += i;
+                       if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
+                               check_ipv4_reputation(w, &data.u.ipv4);
+                       else if ((data.type & SPOE_DATA_T_MASK) == 
SPOE_DATA_T_IPV6)
+                               check_ipv6_reputation(w, &data.u.ipv6);
+                       else {
+                               w->status_code = SPOE_FRM_ERR_INVALID;
+                               goto error;
+                       }
+               }
+               else {
+                       while (nbargs-- > 0) {
+                               /* Silently ignore argument: its name and its 
value */
+                               if ((i = decode_spoe_string(w->buf+idx, end, &str, 
&sz)) == -1) {
+                                       w->status_code = SPOE_FRM_ERR_INVALID;
+                                       goto error;
+                               }
+                               idx += i;
+                               if ((i = skip_spoe_data(w->buf+idx, end)) == 
-1) {
+                                       w->status_code = SPOE_FRM_ERR_INVALID;
+                                       goto error;
+                               }
+                               idx += i;
+                       }
+               }
+       }
+
+       return idx;
+skip:
+       return 0;
+error:
+       return -1;
+}
+
+/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
+ * occurred, the number of written bytes otherwise. */
+static int
+prepare_agenthello(struct worker *w)
+{
+       int idx = 0;
+
+       /* Frame Type */
+       w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
+
+       /* No flags for now */
+       memset(w->buf+idx, 0, 4); /* No flags */
+       idx += 4;
+
+       /* No stream-id and frame-id for HELLO frames */
+       w->buf[idx++] = 0;
+       w->buf[idx++] = 0;
+
+       /* "version" K/V item */
+       idx += encode_spoe_string("version", 7, w->buf+idx);
+       w->buf[idx++] = SPOE_DATA_T_STR;
+       idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
+
+       /* "max-frame-size" K/V item */
+       idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
+       w->buf[idx++] = SPOE_DATA_T_UINT32;
+       idx += encode_spoe_varint(w->size, w->buf+idx);
+
+       /* "capabilities" K/V item */
+       idx += encode_spoe_string("capabilities", 12, w->buf+idx);
+       w->buf[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
+
+       w->len = idx;
+       return idx;
+}
+
+/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
+ * the number of written bytes otherwise. */
+static int
+prepare_agentack(struct worker *w)
+{
+       int idx = 0;
+
+       /* Frame type */
+       w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
+
+       /* No flags for now */
+       memset(w->buf+idx, 0, 4); /* No flags */
+       idx += 4;
+
+       /* Set stream-id and frame-id for ACK frames */
+       idx += encode_spoe_varint(w->stream_id, w->buf+idx);
+       idx += encode_spoe_varint(w->frame_id, w->buf+idx);
+
+       /* Data */
+       if (w->ip_score == -1)
+               goto out;
+
+       w->buf[idx++] = SPOE_ACT_T_SET_VAR;                   /* Action type */
+       w->buf[idx++] = 3;                                    /* Number of args 
*/
+       w->buf[idx++] = SPOE_SCOPE_SESS;                      /* Arg 1: the 
scope */
+ idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
+       w->buf[idx++] = SPOE_DATA_T_UINT32;
+ idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
+out:
+       w->len = idx;
+       return idx;
+}
+
+/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
+ * occurred, the number of written bytes otherwise. */
+static int
+prepare_agentdicon(struct worker *w)
+{
+       const char *reason;
+       int         rlen, idx = 0;
+
+       if (w->status_code >= SPOE_FRM_ERRS)
+               w->status_code = SPOE_FRM_ERR_UNKNOWN;
+       reason = spoe_frm_err_reasons[w->status_code];
+       rlen   = strlen(reason);
+
+       /* Frame type */
+       w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
+
+       /* No flags for now */
+       memset(w->buf+idx, 0, 4);
+       idx += 4;
+
+       /* No stream-id and frame-id for DISCONNECT frames */
+       w->buf[idx++] = 0;
+       w->buf[idx++] = 0;
+
+       /* There are 2 mandatory items: "status-code" and "message" */
+
+       /* "status-code" K/V item */
+       idx += encode_spoe_string("status-code", 11, w->buf+idx);
+       w->buf[idx++] = SPOE_DATA_T_UINT32;
+       idx += encode_spoe_varint(w->status_code, w->buf+idx);
+
+       /* "message" K/V item */
+       idx += encode_spoe_string("message", 7, w->buf+idx);
+       w->buf[idx++] = SPOE_DATA_T_STR;
+       idx += encode_spoe_string(reason, rlen, w->buf+idx);
+
+       w->len = idx;
+       return idx;
+}
+
+static int
+hello_handshake(int sock, struct worker *w)
+{
+       if (read_frame(sock, w) < 0) {
+               LOG("Failed to read Haproxy HELLO frame");
+               goto error;
+       }
+       if (handle_hahello(w) < 0) {
+               LOG("Failed to handle Haproxy HELLO frame");
+               goto error;
+       }
+       if (prepare_agenthello(w) < 0) {
+               LOG("Failed to prepare Agent HELLO frame");
+               goto error;
+       }
+       if (write_frame(sock, w) < 0) {
+               LOG("Failed to write Agent frame");
+               goto error;
+       }
+ DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
+             SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
+       return 0;
+error:
+       return -1;
+}
+
+static int
+notify_ack_roundtip(int sock, struct worker *w)
+{
+       if (read_frame(sock, w) < 0) {
+               LOG("Failed to read Haproxy NOTIFY frame");
+               goto error_or_quit;
+       }
+       if (handle_hadiscon(w) != 0) {
+               if (w->status_code != SPOE_FRM_ERR_NONE)
+                       LOG("Failed to handle Haproxy DISCONNECT frame");
+               DEBUG("Disconnect frame received: reason=%s",
+                     spoe_frm_err_reasons[w->status_code]);
+               goto error_or_quit;
+       }
+       if (handle_hanotify(w) < 0) {
+               LOG("Failed to handle Haproxy NOTIFY frame");
+               goto error_or_quit;
+       }
+       if (prepare_agentack(w) < 0) {
+               LOG("Failed to prepare Agent ACK frame");
+               goto error_or_quit;
+       }
+       if (write_frame(sock, w) < 0) {
+               LOG("Failed to write Agent ACK frame");
+               goto error_or_quit;
+       }
+       DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
+             w->stream_id, w->frame_id);
+       return 0;
+error_or_quit:
+       return -1;
+}
+
+static void *
+worker(void *data)
+{
+       struct worker w;
+       struct sockaddr_in client;
+       int *info = (int *)data;
+       int csock, lsock = info[0];
+
+       signal(SIGPIPE, SIG_IGN);
+       pthread_setspecific(worker_id, &info[1]);
+
+       while (1) {
+               socklen_t sz = sizeof(client);
+
+               if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 
0) {
+                       LOG("Failed to accept client connection: %m");
+                       goto out;
+               }
+               memset(&w, 0, sizeof(w));
+               w.id       = info[1];
+               w.size     = MAX_FRAME_SIZE;
+
+               DEBUG("New connection from HAProxy accepted");
+
+               if (hello_handshake(csock, &w) < 0)
+                       goto disconnect;
+               if (w.healthcheck == true)
+                       goto close;
+               while (1) {
+                       w.ip_score = -1;
+                       if (notify_ack_roundtip(csock, &w) < 0)
+                               break;
+               }
+
+       disconnect:
+               if (w.status_code == SPOE_FRM_ERR_IO) {
+                       LOG("Close the client socket because of I/O errors");
+                       goto close;
+               }
+               if (prepare_agentdicon(&w) < 0) {
+                       LOG("Failed to prepare Agent DISCONNECT frame");
+                       goto close;
+               }
+               if (write_frame(csock, &w) < 0) {
+                       LOG("Failed to write Agent DISCONNECT frame");
+                       goto close;
+               }
+               DEBUG("Disconnect frame sent: reason=%s",
+                     spoe_frm_err_reasons[w.status_code]);
+
+       close:
+               close(csock);
+       }
+
+out:
+       free(info);
+       pthread_exit(NULL);
+}
+
+static void
+usage(char *prog)
+{
+ fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
+       fprintf(stderr, "    -h                  Print this message\n");
+       fprintf(stderr, "    -d                  Enable the debug mode\n");
+ fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n"); + fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
+}
+
+int
+main(int argc, char **argv)
+{
+       pthread_t *ts = NULL;
+       struct sockaddr_in server;
+       int i, sock, opt, nbworkers, port;
+
+       nbworkers = NUM_WORKERS;
+       port      = DEFAULT_PORT;
+       while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
+               switch (opt) {
+                       case 'h':
+                               usage(argv[0]);
+                               return EXIT_SUCCESS;
+                       case 'd':
+                               debug = true;
+                               break;
+                       case 'n':
+                               nbworkers = atoi(optarg);
+                               break;
+                       case 'p':
+                               port = atoi(optarg);
+                               break;
+                       default:
+                               usage(argv[0]);
+                               return EXIT_FAILURE;
+               }
+       }
+
+       if (nbworkers <= 0) {
+               fprintf(stderr, "%s: Invalid number of workers '%d'\n",
+                       argv[0], nbworkers);
+               goto error;
+       }
+       if (port <= 0) {
+               fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
+               goto error;
+       }
+
+       if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               fprintf(stderr, "Failed creating socket: %m\n");
+               goto error;
+       }
+
+       setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
+       setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
+
+       memset(&server, 0, sizeof(server));
+       server.sin_family      = AF_INET;
+       server.sin_addr.s_addr = INADDR_ANY;
+       server.sin_port        = htons(port);
+
+       if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
+               fprintf(stderr, "Failed to bind the socket: %m\n");
+               goto error;
+       }
+
+       if (listen(sock , 10) < 0) {
+               fprintf(stderr, "Failed to listen on the socket: %m\n");
+               goto error;
+       }
+       fprintf(stderr, "SPOA is listening on port %d\n", port);
+
+       ts = calloc(nbworkers, sizeof(*ts));
+       pthread_key_create(&worker_id, NULL);
+       for (i = 0; i < nbworkers; i++) {
+               int *info = calloc(2, sizeof(*info));
+
+               info[0] = sock;
+               info[1] = i+1;
+               if (pthread_create(&ts[i], NULL,  worker, info) < 0) {
+                       fprintf(stderr, "Failed to create thread %d: %m\n", 
i+1);
+                       goto error;
+               }
+               fprintf(stderr, "SPOA worker %02d started\n", i+1);
+       }
+
+       for (i = 0; i < nbworkers; i++) {
+               pthread_join(ts[i], NULL);
+               fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
+       }
+       pthread_key_delete(worker_id);
+       free(ts);
+       close(sock);
+       return EXIT_SUCCESS;
+error:
+       free(ts);
+       return EXIT_FAILURE;
+}
--
2.9.5


>From da19fe72096cb3948449c9656b619d8a9dcfdf32 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 11:42:57 +0100
Subject: [PATCH 02/14] MINOR: spoa-server: move some definition from
 spoa_server.c to spoa_server.h

This will allow to add some other files to the project
---
 contrib/spoa_server/spoa.c | 65 ++-----------------------------------
 contrib/spoa_server/spoa.h | 81 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 84 insertions(+), 62 deletions(-)
 create mode 100644 contrib/spoa_server/spoa.h

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index ce59c04..52d1c91 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -7,6 +7,7 @@
  * using the SPOE.
  *
  * Copyright 2016 HAProxy Technologies, Christopher Faulet 
<cfau...@haproxy.com>
+ * Copyright 2018 OZON / Thierry Fournier <thierry.fourn...@ozon.io>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -28,11 +29,10 @@
 #include <netinet/tcp.h>
 #include <arpa/inet.h>

+#include "spoa.h"
+
 #define DEFAULT_PORT      12345
 #define NUM_WORKERS       5
-#define MAX_FRAME_SIZE    16384
-#define SPOP_VERSION      "1.0"
-#define SPOA_CAPABILITIES ""

 #define SLEN(str) (sizeof(str)-1)

@@ -65,21 +65,6 @@ enum spoe_frame_type {
        SPOE_FRM_T_AGENT_ACK
 };

-/* All supported data types */
-enum spoe_data_type {
-       SPOE_DATA_T_NULL = 0,
-       SPOE_DATA_T_BOOL,
-       SPOE_DATA_T_INT32,
-       SPOE_DATA_T_UINT32,
-       SPOE_DATA_T_INT64,
-       SPOE_DATA_T_UINT64,
-       SPOE_DATA_T_IPV4,
-       SPOE_DATA_T_IPV6,
-       SPOE_DATA_T_STR,
-       SPOE_DATA_T_BIN,
-       SPOE_DATA_TYPES
-};
-
 /* Errors triggerd by SPOE applet */
 enum spoe_frame_error {
        SPOE_FRM_ERR_NONE = 0,
@@ -103,16 +88,6 @@ enum spoe_action_type {
        SPOE_ACT_TYPES,
 };

-/* Scopes used for variables set by agents. It is a way to be agnotic to vars
- * scope. */
-enum spoe_vars_scope {
-       SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC  */
-       SPOE_SCOPE_SESS,     /* <=> SCOPE_SESS */
-       SPOE_SCOPE_TXN,      /* <=> SCOPE_TXN  */
-       SPOE_SCOPE_REQ,      /* <=> SCOPE_REQ  */
-       SPOE_SCOPE_RES,      /* <=> SCOPE_RES  */
-};
-

 /* Masks to get data type or flags value */
 #define SPOE_DATA_T_MASK  0x0F
@@ -135,40 +110,6 @@ static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
        [SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
 };

-struct worker {
-       unsigned int id;
-       char         buf[MAX_FRAME_SIZE];
-       unsigned int len;
-       unsigned int size;
-       int          status_code;
-       unsigned int stream_id;
-       unsigned int frame_id;
-       bool         healthcheck;
-       int          ip_score; /* -1 if unset, else between 0 and 100 */
-};
-
-struct chunk {
-       char *str;      /* beginning of the string itself. Might not be 
0-terminated */
-       int len;        /* current size of the string from first to last char */
-};
-
-union spoe_value {
-       bool            boolean; /* use for boolean */
-       int32_t         sint32;  /* used for signed 32bits integers */
-       uint32_t        uint32;  /* used for signed 32bits integers */
-       int32_t         sint64;  /* used for signed 64bits integers */
-       uint32_t        uint64;  /* used for signed 64bits integers */
-       struct in_addr  ipv4;    /* used for ipv4 addresses */
-       struct in6_addr ipv6;    /* used for ipv6 addresses */
-       struct chunk    buffer;  /* used for char strings or buffers */
-};
-
-/* Used to store sample constant */
-struct spoe_data {
-       enum spoe_data_type type;  /* SPOE_DATA_T_* */
-       union spoe_value    u;     /* spoe data value */
-};
-
 static bool debug = false;
 static pthread_key_t worker_id;

diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
new file mode 100644
index 0000000..81f5816
--- /dev/null
+++ b/contrib/spoa_server/spoa.h
@@ -0,0 +1,81 @@
+/* Main SPOA server includes
+ *
+ * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfau...@haproxy.com>
+ * Copyright 2018 OZON / Thierry Fournier <thierry.fourn...@ozon.io>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ */
+#ifndef __SPOA_H__
+#define __SPOA_H__
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <netinet/in.h>
+
+#define MAX_FRAME_SIZE    16384
+#define SPOP_VERSION      "1.0"
+#define SPOA_CAPABILITIES ""
+
+/* All supported data types */
+enum spoe_data_type {
+       SPOE_DATA_T_NULL = 0,
+       SPOE_DATA_T_BOOL,
+       SPOE_DATA_T_INT32,
+       SPOE_DATA_T_UINT32,
+       SPOE_DATA_T_INT64,
+       SPOE_DATA_T_UINT64,
+       SPOE_DATA_T_IPV4,
+       SPOE_DATA_T_IPV6,
+       SPOE_DATA_T_STR,
+       SPOE_DATA_T_BIN,
+       SPOE_DATA_TYPES
+};
+
+/* Scopes used for variables set by agents. It is a way to be agnotic to vars
+ * scope. */
+enum spoe_vars_scope {
+       SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC  */
+       SPOE_SCOPE_SESS,     /* <=> SCOPE_SESS */
+       SPOE_SCOPE_TXN,      /* <=> SCOPE_TXN  */
+       SPOE_SCOPE_REQ,      /* <=> SCOPE_REQ  */
+       SPOE_SCOPE_RES,      /* <=> SCOPE_RES  */
+};
+
+struct worker {
+       unsigned int id;
+       char         buf[MAX_FRAME_SIZE];
+       unsigned int len;
+       unsigned int size;
+       int          status_code;
+       unsigned int stream_id;
+       unsigned int frame_id;
+       bool         healthcheck;
+       int          ip_score; /* -1 if unset, else between 0 and 100 */
+};
+
+struct chunk {
+       char *str;      /* beginning of the string itself. Might not be 
0-terminated */
+       int len;        /* current size of the string from first to last char */
+};
+
+union spoe_value {
+       bool            boolean; /* use for boolean */
+       int32_t         sint32;  /* used for signed 32bits integers */
+       uint32_t        uint32;  /* used for signed 32bits integers */
+       int32_t         sint64;  /* used for signed 64bits integers */
+       uint32_t        uint64;  /* used for signed 64bits integers */
+       struct in_addr  ipv4;    /* used for ipv4 addresses */
+       struct in6_addr ipv6;    /* used for ipv6 addresses */
+       struct chunk    buffer;  /* used for char strings or buffers */
+};
+
+/* Used to store sample constant */
+struct spoe_data {
+       enum spoe_data_type type;  /* SPOE_DATA_T_* */
+       union spoe_value    u;     /* spoe data value */
+};
+
+#endif /* __SPOA_H__ */
--
2.9.5


>From 3dcb0d3318b94cff72ab5592f46bfe83208df557 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Sun, 25 Feb 2018 10:54:56 +0100
Subject: [PATCH 03/14] MINOR: spoa-server: Externalise debug functions

Make external LOG and DEBUG function. Other process can use this ones
and later these functions will be replaced by another log system
---
 contrib/spoa_server/spoa.c | 20 ++------------------
 contrib/spoa_server/spoa.h | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 52d1c91..94be6c0 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -36,22 +36,6 @@

 #define SLEN(str) (sizeof(str)-1)

-#define LOG(fmt, args...)                                           \
-       do {                                                        \
-               struct timeval now;                                 \
-               int wid = *((int*)pthread_getspecific(worker_id));  \
-                                                                   \
-               gettimeofday(&now, NULL);                       \
-               fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n",           \
-                       now.tv_sec, now.tv_usec, wid, ##args);      \
-       } while (0)
-
-#define DEBUG(x...)                             \
-       do {                                    \
-               if (debug)                      \
-                       LOG(x);                 \
-       } while (0)
-
 /* Frame Types sent by HAProxy and by agents */
 enum spoe_frame_type {
        /* Frames sent by HAProxy */
@@ -110,8 +94,8 @@ static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
        [SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
 };

-static bool debug = false;
-static pthread_key_t worker_id;
+bool debug = false;
+pthread_key_t worker_id;

 static void
 check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index 81f5816..92c24ac 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -11,9 +11,11 @@
 #ifndef __SPOA_H__
 #define __SPOA_H__

+#include <pthread.h>
 #include <stdbool.h>
 #include <stdint.h>
 #include <netinet/in.h>
+#include <sys/time.h>

 #define MAX_FRAME_SIZE    16384
 #define SPOP_VERSION      "1.0"
@@ -78,4 +80,23 @@ struct spoe_data {
        union spoe_value    u;     /* spoe data value */
 };

+extern bool debug;
+extern pthread_key_t worker_id;
+
+#define LOG(fmt, args...) \
+       do { \
+               struct timeval now; \
+               int wid = *((int*)pthread_getspecific(worker_id)); \
+               \
+               gettimeofday(&now, NULL); \
+               fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \
+                       now.tv_sec, now.tv_usec, wid, ##args); \
+       } while (0)
+
+#define DEBUG(x...) \
+       do { \
+               if (debug) \
+                       LOG(x); \
+       } while (0)
+
 #endif /* __SPOA_H__ */
--
2.9.5


>From 1ff8d283898d707924159c80a071f869fbd32156 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 11:59:15 +0100
Subject: [PATCH 04/14] MINOR: spoe-server: rename "worker" functions

"worker" name is a little bit generic and it is used in many
places, so it is hard to find the expected symbol.
---
 contrib/spoa_server/spoa.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 94be6c0..b08c1d6 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -918,7 +918,7 @@ error_or_quit:
 }

 static void *
-worker(void *data)
+spoa_worker(void *data)
 {
        struct worker w;
        struct sockaddr_in client;
@@ -1056,7 +1056,7 @@ main(int argc, char **argv)

                info[0] = sock;
                info[1] = i+1;
-               if (pthread_create(&ts[i], NULL,  worker, info) < 0) {
+               if (pthread_create(&ts[i], NULL,  spoa_worker, info) < 0) {
                        fprintf(stderr, "Failed to create thread %d: %m\n", 
i+1);
                        goto error;
                }
--
2.9.5


>From edc99e860f8b65adc257768edb62603a1baa9364 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 13:50:26 +0100
Subject: [PATCH 05/14] MINOR: spoa-server: Replace the thread init system by
 processes

I will replace thread by processes. Note that, I keep the pthread_key
system for identifiying process in the same way that threads. Note
also that I keep commented out the original thread code because I hope
to reactivate it.
---
 contrib/spoa_server/spoa.c | 56 +++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 53 insertions(+), 3 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index b08c1d6..c762541 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -24,6 +24,7 @@
 #include <pthread.h>
 #include <sys/time.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
@@ -973,7 +974,21 @@ spoa_worker(void *data)

 out:
        free(info);
+#if 0
        pthread_exit(NULL);
+#endif
+       return NULL;
+}
+
+int process_create(pid_t *pid, void *(*ps)(void *), void *data)
+{
+       *pid = fork();
+       if (*pid == -1)
+               return -1;
+       if (*pid > 0)
+               return 0;
+       ps(data);
+       return 0;
 }

 static void
@@ -989,9 +1004,13 @@ usage(char *prog)
 int
 main(int argc, char **argv)
 {
+#if 0
        pthread_t *ts = NULL;
+#endif
+       pid_t *pids;
        struct sockaddr_in server;
        int i, sock, opt, nbworkers, port;
+       int status;

        nbworkers = NUM_WORKERS;
        port      = DEFAULT_PORT;
@@ -1049,13 +1068,20 @@ main(int argc, char **argv)
        }
        fprintf(stderr, "SPOA is listening on port %d\n", port);

-       ts = calloc(nbworkers, sizeof(*ts));
        pthread_key_create(&worker_id, NULL);
+
+       /* Initialise the server in thread mode. This code is commented
+        * out and not deleted, because later I expect to work with
+        * process ansd threads. This first version just support processes.
+        */
+#if 0
+       ts = calloc(nbworkers, sizeof(*ts));
        for (i = 0; i < nbworkers; i++) {
                int *info = calloc(2, sizeof(*info));

                info[0] = sock;
                info[1] = i+1;
+
                if (pthread_create(&ts[i], NULL,  spoa_worker, info) < 0) {
                        fprintf(stderr, "Failed to create thread %d: %m\n", 
i+1);
                        goto error;
@@ -1067,11 +1093,35 @@ main(int argc, char **argv)
                pthread_join(ts[i], NULL);
                fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
        }
-       pthread_key_delete(worker_id);
        free(ts);
+#endif
+
+       /* Start processes */
+       pids = calloc(nbworkers, sizeof(*pids));
+       if (!pids) {
+               fprintf(stderr, "Out of memory error\n");
+               goto error;
+       }
+       for (i = 0; i < nbworkers; i++) {
+               int *info = calloc(2, sizeof(*info));
+
+               info[0] = sock;
+               info[1] = i+1;
+
+               if (process_create(&pids[i], spoa_worker, info) == -1) {
+                       fprintf(stderr, "SPOA worker %02d started\n", i+1);
+                       goto error;
+               }
+               fprintf(stderr, "SPOA worker %02d started\n", i+1);
+       }
+       for (i = 0; i < nbworkers; i++) {
+               waitpid(pids[0], &status, 0);
+               fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
+       }
+
        close(sock);
+       pthread_key_delete(worker_id);
        return EXIT_SUCCESS;
 error:
-       free(ts);
        return EXIT_FAILURE;
 }
--
2.9.5


>From d266b3c564700785800818c88242b60458ef0138 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 19:11:47 +0100
Subject: [PATCH 06/14] MINOR: spoa-server: With debug mode, start only one
 process

Because debug with processes is simpler if only one process is started.
---
 contrib/spoa_server/spoa.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index c762541..3766c47 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -982,6 +982,10 @@ out:

 int process_create(pid_t *pid, void *(*ps)(void *), void *data)
 {
+       if (debug) {
+               ps(data);
+               exit(EXIT_SUCCESS);
+       }
        *pid = fork();
        if (*pid == -1)
                return -1;
--
2.9.5


>From f796ceed0ad05df02597a8aaf9fbfa451c203da1 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 14:58:40 +0100
Subject: [PATCH 07/14] MINOR: spoa-server: Allow registering external
 processes

Add struct for declaring an reistrering external processing resource.
---
 contrib/spoa_server/spoa.c | 12 ++++++++++++
 contrib/spoa_server/spoa.h | 13 +++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 3766c47..6645c8c 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -97,6 +97,13 @@ static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {

 bool debug = false;
 pthread_key_t worker_id;
+static struct ps *ps_list = NULL;
+
+void ps_register(struct ps *ps)
+{
+       ps->next = ps_list;
+       ps_list = ps;
+}

 static void
 check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
@@ -925,10 +932,15 @@ spoa_worker(void *data)
        struct sockaddr_in client;
        int *info = (int *)data;
        int csock, lsock = info[0];
+       struct ps *ps;

        signal(SIGPIPE, SIG_IGN);
        pthread_setspecific(worker_id, &info[1]);

+       /* Init registered processors */
+       for (ps = ps_list; ps != NULL; ps = ps->next)
+               ps->init_worker(&w);
+
        while (1) {
                socklen_t sz = sizeof(client);

diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index 92c24ac..c8f9861 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -80,9 +80,22 @@ struct spoe_data {
        union spoe_value    u;     /* spoe data value */
 };

+struct spoe_kv {
+       struct chunk name;
+       struct spoe_data value;
+};
+
+struct ps {
+       struct ps *next;
+       char *ext;
+       int (*init_worker)(struct worker *w);
+};
+
 extern bool debug;
 extern pthread_key_t worker_id;

+void ps_register(struct ps *ps);
+
 #define LOG(fmt, args...) \
        do { \
                struct timeval now; \
--
2.9.5


>From 41edf052ae13e16a79963a7c06316e4e0dfed666 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 14:27:05 +0100
Subject: [PATCH 08/14] MINOR: spoa-server: Allow registering message
 processors

This function register processor executed by any language for processing
an SPOP message.
---
 contrib/spoa_server/spoa.c | 30 ++++++++++++++++++++++++++++++
 contrib/spoa_server/spoa.h |  9 +++++++++
 2 files changed, 39 insertions(+)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 6645c8c..73b7133 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -98,6 +98,7 @@ static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
 bool debug = false;
 pthread_key_t worker_id;
 static struct ps *ps_list = NULL;
+static struct ps_message *ps_messages = NULL;

 void ps_register(struct ps *ps)
 {
@@ -105,6 +106,35 @@ void ps_register(struct ps *ps)
        ps_list = ps;
 }

+void ps_register_message(struct ps *ps, const char *name, void *ref)
+{
+       struct ps_message *msg;
+
+       /* Look for already registered name */
+       for (msg = ps_messages; msg; msg = msg->next) {
+               if (strcmp(name, msg->name) == 0) {
+                       LOG("Message \"%s\" already registered\n", name);
+                       exit(EXIT_FAILURE);
+               }
+       }
+
+       msg = calloc(1, sizeof(*msg));
+       if (msg == NULL) {
+               LOG("Out of memory error\n");
+               exit(EXIT_FAILURE);
+       }
+
+       msg->next = ps_messages;
+       ps_messages = msg;
+       msg->name = strdup(name);
+       if (msg->name == NULL) {
+               LOG("Out of memory error\n");
+               exit(EXIT_FAILURE);
+       }
+       msg->ref = ref;
+       msg->ps = ps;
+}
+
 static void
 check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
 {
diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index c8f9861..22d6788 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -89,12 +89,21 @@ struct ps {
        struct ps *next;
        char *ext;
        int (*init_worker)(struct worker *w);
+ int (*exec_message)(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
+};
+
+struct ps_message {
+       struct ps_message *next;
+       const char *name;
+       struct ps *ps;
+       void *ref;
 };

 extern bool debug;
 extern pthread_key_t worker_id;

 void ps_register(struct ps *ps);
+void ps_register_message(struct ps *ps, const char *name, void *ref);

 #define LOG(fmt, args...) \
        do { \
--
2.9.5


>From ef804062cb95f7a5c5192d8082bdc2c28fc91f30 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 15:12:55 +0100
Subject: [PATCH 09/14] MINOR: spoa-server: Load files

Declare files to be executed at the begining and execute it. The binding
between the engine and the file is done throught the extension.
---
 contrib/spoa_server/spoa.c | 42 ++++++++++++++++++++++++++++++++++++++++--
 contrib/spoa_server/spoa.h |  1 +
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 73b7133..e484315 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -99,6 +99,23 @@ bool debug = false;
 pthread_key_t worker_id;
 static struct ps *ps_list = NULL;
 static struct ps_message *ps_messages = NULL;
+static int nfiles = 0;
+static char **files = NULL;
+
+static inline void add_file(const char *file)
+{
+       nfiles++;
+       files = realloc(files, sizeof(*files) * nfiles);
+       if (files == NULL) {
+               fprintf(stderr, "Out of memory error\n");
+               exit(EXIT_FAILURE);
+       }
+       files[nfiles - 1] = strdup(file);
+       if (files[nfiles - 1] == NULL) {
+               fprintf(stderr, "Out of memory error\n");
+               exit(EXIT_FAILURE);
+       }
+}

 void ps_register(struct ps *ps)
 {
@@ -963,6 +980,8 @@ spoa_worker(void *data)
        int *info = (int *)data;
        int csock, lsock = info[0];
        struct ps *ps;
+       int i;
+       int len;

        signal(SIGPIPE, SIG_IGN);
        pthread_setspecific(worker_id, &info[1]);
@@ -971,6 +990,20 @@ spoa_worker(void *data)
        for (ps = ps_list; ps != NULL; ps = ps->next)
                ps->init_worker(&w);

+       /* Load files */
+       for (i = 0; i < nfiles; i++) {
+               len = strlen(files[i]);
+               for (ps = ps_list; ps != NULL; ps = ps->next)
+                       if (strcmp(files[i] + len - strlen(ps->ext), ps->ext) 
== 0)
+                               break;
+               if (ps == NULL) {
+                       LOG("Can't load file \"%s\"\n", files[i]);
+                       goto out;
+               }
+               if (!ps->load_file(&w, files[i]))
+                       goto out;
+       }
+
        while (1) {
                socklen_t sz = sizeof(client);

@@ -1040,11 +1073,13 @@ int process_create(pid_t *pid, void *(*ps)(void *), void *data)
 static void
 usage(char *prog)
 {
- fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog); + fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>] -f <file>\n", prog);
        fprintf(stderr, "    -h                  Print this message\n");
        fprintf(stderr, "    -d                  Enable the debug mode\n");
fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n"); fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n"); + fprintf(stderr, " -f <file> Specify the file whoch contains the processing code.\n"); + fprintf(stderr, " This argument can specified more than once.\n");
 }

 int
@@ -1060,7 +1095,7 @@ main(int argc, char **argv)

        nbworkers = NUM_WORKERS;
        port      = DEFAULT_PORT;
-       while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
+       while ((opt = getopt(argc, argv, "hdn:p:f:")) != -1) {
                switch (opt) {
                        case 'h':
                                usage(argv[0]);
@@ -1074,6 +1109,9 @@ main(int argc, char **argv)
                        case 'p':
                                port = atoi(optarg);
                                break;
+                       case 'f':
+                               add_file(optarg);
+                               break;
                        default:
                                usage(argv[0]);
                                return EXIT_FAILURE;
diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index 22d6788..ca85181 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -90,6 +90,7 @@ struct ps {
        char *ext;
        int (*init_worker)(struct worker *w);
int (*exec_message)(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
+       int (*load_file)(struct worker *w, const char *file);
 };

 struct ps_message {
--
2.9.5


>From ce759ec47bed26ceeb2a69e78f1a5742d8ae29cf Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 18:24:10 +0100
Subject: [PATCH 10/14] MINOR: spoa-server: Prepare responses

This patch adds SPOP responses managament. It provides SPOP
encoding primitives. It also move the example function
ip_reputation to this new behavior.
---
 contrib/spoa_server/spoa.c | 204 ++++++++++++++++++++++++++++++++++++---------
 contrib/spoa_server/spoa.h |  38 ++++++++-
 2 files changed, 202 insertions(+), 40 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index e484315..53fc759 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -156,26 +156,30 @@ static void
 check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
 {
        char str[INET_ADDRSTRLEN];
+       unsigned int score;

        if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
                return;

-       w->ip_score = random() % 100;
+       score = random() % 100;
+       set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);

-       DEBUG("  IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
+       DEBUG("  IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score);
 }

 static void
 check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
 {
        char str[INET6_ADDRSTRLEN];
+       unsigned int score;

        if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
                return;

-       w->ip_score = random() % 100;
+       score = random() % 100;
+       set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);

-       DEBUG("  IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
+       DEBUG("  IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score);
 }

 static int
@@ -700,6 +704,159 @@ error:
        return -1;
 }

+/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
+ * the number of written bytes otherwise. */
+static void prepare_agentack(struct worker *w)
+{
+       w->ack_len = 0;
+
+       /* Frame type */
+       w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
+
+       /* No flags for now */
+       memset(w->ack + w->ack_len, 0, 4); /* No flags */
+       w->ack_len += 4;
+
+       /* Set stream-id and frame-id for ACK frames */
+       w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
+       w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
+}
+
+static inline
+int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
+{
+       w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
+       w->ack[w->ack_len++] = 3;                  /* Number of args */
+       w->ack[w->ack_len++] = scope;              /* Arg 1: the scope */
+ w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
+       return 1;
+}
+
+int set_var_null(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
+       return 1;
+}
+
+int set_var_bool(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope, bool value)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
+       return 1;
+}
+
+static inline
+int set_var_int(struct worker *w,
+                const char *name, int name_len,
+                unsigned char scope, int type, uint64_t value)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
+ w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
+       return 1;
+}
+
+int set_var_uint32(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope, uint32_t value)
+{
+       return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
+}
+
+int set_var_int32(struct worker *w,
+                  const char *name, int name_len,
+                  unsigned char scope, int32_t value)
+{
+       return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_uint64(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope, uint64_t value)
+{
+       return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_int64(struct worker *w,
+                  const char *name, int name_len,
+                  unsigned char scope, int64_t value)
+{
+       return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_ipv4(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope,
+                 struct in_addr *ipv4)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
+       memcpy(w->ack+w->ack_len, ipv4, 4);
+       w->ack_len += 4;
+       return 1;
+}
+
+int set_var_ipv6(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope,
+                 struct in6_addr *ipv6)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
+       memcpy(w->ack+w->ack_len, ipv6, 16);
+       w->ack_len += 16;
+       return 1;
+}
+
+static inline
+int set_var_buf(struct worker *w,
+                const char *name, int name_len,
+                unsigned char scope, int type,
+                const char *str, int str_len)
+{
+       if (!set_var_name(w, name, name_len, scope))
+               return 0;
+       w->ack[w->ack_len++] = type;
+       w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
+       return 1;
+}
+
+int set_var_string(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope,
+                   const char *str, int strlen)
+{
+       return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, 
strlen);
+}
+
+int set_var_bin(struct worker *w,
+                const char *name, int name_len,
+                unsigned char scope,
+                const char *str, int strlen)
+{
+       return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, 
strlen);
+}
+
+/* This function is a little bit ugly,
+ * TODO: improve the response without copying the bufer
+ */
+static int commit_agentack(struct worker *w)
+{
+       memcpy(w->buf, w->ack, w->ack_len);
+       w->len = w->ack_len;
+       return 1;
+}
+
 /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
  * occurred, 0 if the frame must be skipped, otherwise the number of read
  * bytes. */
@@ -737,6 +894,9 @@ handle_hanotify(struct worker *w)
        DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
              w->stream_id, w->frame_id);

+       /* Prepara ack, if the processing fails tha ack will be cancelled */
+       prepare_agentack(w);
+
        /* Loop on messages */
        while (idx < w->len) {
                char    *str;
@@ -840,39 +1000,6 @@ prepare_agenthello(struct worker *w)
        return idx;
 }

-/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
- * the number of written bytes otherwise. */
-static int
-prepare_agentack(struct worker *w)
-{
-       int idx = 0;
-
-       /* Frame type */
-       w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
-
-       /* No flags for now */
-       memset(w->buf+idx, 0, 4); /* No flags */
-       idx += 4;
-
-       /* Set stream-id and frame-id for ACK frames */
-       idx += encode_spoe_varint(w->stream_id, w->buf+idx);
-       idx += encode_spoe_varint(w->frame_id, w->buf+idx);
-
-       /* Data */
-       if (w->ip_score == -1)
-               goto out;
-
-       w->buf[idx++] = SPOE_ACT_T_SET_VAR;                   /* Action type */
-       w->buf[idx++] = 3;                                    /* Number of args 
*/
-       w->buf[idx++] = SPOE_SCOPE_SESS;                      /* Arg 1: the 
scope */
- idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
-       w->buf[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
-out:
-       w->len = idx;
-       return idx;
-}
-
 /* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
  * occurred, the number of written bytes otherwise. */
 static int
@@ -957,7 +1084,7 @@ notify_ack_roundtip(int sock, struct worker *w)
                LOG("Failed to handle Haproxy NOTIFY frame");
                goto error_or_quit;
        }
-       if (prepare_agentack(w) < 0) {
+       if (commit_agentack(w) < 0) {
                LOG("Failed to prepare Agent ACK frame");
                goto error_or_quit;
        }
@@ -1022,7 +1149,6 @@ spoa_worker(void *data)
                if (w.healthcheck == true)
                        goto close;
                while (1) {
-                       w.ip_score = -1;
                        if (notify_ack_roundtip(csock, &w) < 0)
                                break;
                }
diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index ca85181..ee19f37 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -55,7 +55,8 @@ struct worker {
        unsigned int stream_id;
        unsigned int frame_id;
        bool         healthcheck;
-       int          ip_score; /* -1 if unset, else between 0 and 100 */
+       char         ack[MAX_FRAME_SIZE];
+       unsigned int ack_len;
 };

 struct chunk {
@@ -106,6 +107,41 @@ extern pthread_key_t worker_id;
 void ps_register(struct ps *ps);
 void ps_register_message(struct ps *ps, const char *name, void *ref);

+int set_var_null(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope);
+int set_var_bool(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope, bool value);
+int set_var_uint32(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope, uint32_t value);
+int set_var_int32(struct worker *w,
+                  const char *name, int name_len,
+                  unsigned char scope, int32_t value);
+int set_var_uint64(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope, uint64_t value);
+int set_var_int64(struct worker *w,
+                  const char *name, int name_len,
+                  unsigned char scope, int64_t value);
+int set_var_ipv4(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope,
+                 struct in_addr *ipv4);
+int set_var_ipv6(struct worker *w,
+                 const char *name, int name_len,
+                 unsigned char scope,
+                 struct in6_addr *ipv6);
+int set_var_string(struct worker *w,
+                   const char *name, int name_len,
+                   unsigned char scope,
+                   const char *str, int strlen);
+int set_var_bin(struct worker *w,
+                const char *name, int name_len,
+                unsigned char scope,
+                const char *str, int strlen);
+
 #define LOG(fmt, args...) \
        do { \
                struct timeval now; \
--
2.9.5


>From 4ec26e42a040da5e4357ff12d1c80f02a1287046 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 14:42:46 +0100
Subject: [PATCH 11/14] MINOR: spoa-server: Execute registered callbacks

Call the right function with the right engine for each received message.
---
 contrib/spoa_server/spoa.c | 94 ++++++++++++++++------------------------------
 1 file changed, 33 insertions(+), 61 deletions(-)

diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index 53fc759..6328f80 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -15,6 +15,7 @@
  * 2 of the License, or (at your option) any later version.
  *
  */
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -152,36 +153,6 @@ void ps_register_message(struct ps *ps, const char *name, void *ref)
        msg->ps = ps;
 }

-static void
-check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
-{
-       char str[INET_ADDRSTRLEN];
-       unsigned int score;
-
-       if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
-               return;
-
-       score = random() % 100;
-       set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
-
-       DEBUG("  IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score);
-}
-
-static void
-check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
-{
-       char str[INET6_ADDRSTRLEN];
-       unsigned int score;
-
-       if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
-               return;
-
-       score = random() % 100;
-       set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
-
-       DEBUG("  IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score);
-}
-
 static int
 do_read(int sock, void *buf, int read_len)
 {
@@ -866,6 +837,10 @@ handle_hanotify(struct worker *w)
        char    *end = w->buf+w->len;
        uint64_t stream_id, frame_id;
        int      nbargs, i, idx = 0;
+       int index;
+       struct spoe_kv args[256];
+       uint64_t length;
+       struct ps_message *msg;

        /* Check frame type */
        if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
@@ -910,50 +885,46 @@ handle_hanotify(struct worker *w)
                }
                DEBUG("  Message '%.*s' received", (int)sz, str);

-               nbargs = w->buf[idx++];
-               if (!memcmp(str, "check-client-ip", sz)) {
-                       struct spoe_data data;
-
-                       memset(&data, 0, sizeof(data));
+               /* Decode all SPOE data */
+               nbargs = (unsigned char)w->buf[idx++];
+               for (index = 0; index < nbargs; index++) {

-                       if (nbargs != 1) {
+                       /* Read the key name */
+                       if ((i = decode_spoe_string(w->buf+idx, end,
+                                                   &args[index].name.str,
+                                                   &length)) == -1) {
                                w->status_code = SPOE_FRM_ERR_INVALID;
                                goto error;
                        }
-                       if ((i = decode_spoe_string(w->buf+idx, end, &str, 
&sz)) == -1) {
-                               w->status_code = SPOE_FRM_ERR_INVALID;
+                       if (length > INT_MAX) {
+                               w->status_code = SPOE_FRM_ERR_TOO_BIG;
                                goto error;
                        }
+                       args[index].name.len = length;
                        idx += i;
-                       if ((i = decode_spoe_data(w->buf+idx, end, &data)) == 
-1) {
+
+                       /* Read the value */
+                       memset(&args[index].value, 0, 
sizeof(args[index].value));
+                       if ((i = decode_spoe_data(w->buf+idx, end, 
&args[index].value)) == -1) {
                                w->status_code = SPOE_FRM_ERR_INVALID;
                                goto error;
                        }
                        idx += i;
-                       if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
-                               check_ipv4_reputation(w, &data.u.ipv4);
-                       else if ((data.type & SPOE_DATA_T_MASK) == 
SPOE_DATA_T_IPV6)
-                               check_ipv6_reputation(w, &data.u.ipv6);
-                       else {
-                               w->status_code = SPOE_FRM_ERR_INVALID;
-                               goto error;
-                       }
                }
-               else {
-                       while (nbargs-- > 0) {
-                               /* Silently ignore argument: its name and its 
value */
-                               if ((i = decode_spoe_string(w->buf+idx, end, &str, 
&sz)) == -1) {
-                                       w->status_code = SPOE_FRM_ERR_INVALID;
-                                       goto error;
-                               }
-                               idx += i;
-                               if ((i = skip_spoe_data(w->buf+idx, end)) == 
-1) {
-                                       w->status_code = SPOE_FRM_ERR_INVALID;
-                                       goto error;
-                               }
-                               idx += i;
-                       }
+
+               /* Lookup for existsing bindings. If no existing message
+                * where found, does nothing.
+                */
+               for (msg = ps_messages; msg; msg = msg->next)
+                       if (sz == strlen(msg->name) && strncmp(str, msg->name, 
sz) == 0)
+                               break;
+               if (msg == NULL || msg->ps->exec_message == NULL) {
+                       DEBUG("  Message '%.*s' have no bindings registered", 
(int)sz, str);
+                       continue;
                }
+
+               /* Process the message */
+               msg->ps->exec_message(w, msg->ref, nbargs, args);
        }

        return idx;
@@ -1332,6 +1303,7 @@ main(int argc, char **argv)
        close(sock);
        pthread_key_delete(worker_id);
        return EXIT_SUCCESS;
+
 error:
        return EXIT_FAILURE;
 }
--
2.9.5


>From 42f25eb5ca25d8b4352f13b6b0f1187d7702a857 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Fri, 23 Feb 2018 15:20:55 +0100
Subject: [PATCH 12/14] MINOR: spoa-server: Add Lua processing

Use the defined binding for registering Lua engine.
---
 contrib/spoa_server/Makefile |  12 +-
 contrib/spoa_server/ps_lua.c | 510 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 521 insertions(+), 1 deletion(-)
 create mode 100644 contrib/spoa_server/ps_lua.c

diff --git a/contrib/spoa_server/Makefile b/contrib/spoa_server/Makefile
index e6b7c53..31e3de5 100644
--- a/contrib/spoa_server/Makefile
+++ b/contrib/spoa_server/Makefile
@@ -10,9 +10,19 @@ LDFLAGS = -lpthread

 OBJS = spoa.o

+ifneq ($(USE_LUA),)
+OBJS += ps_lua.o
+ifneq ($(LUA_INC),)
+CFLAGS += -I$(LUA_INC)
+endif
+ifneq ($(LUA_LIB),)
+LDLIBS += -L$(LUA_LIB)
+endif
+LDLIBS += -ldl -Wl,--export-dynamic -llua -lm -Wl,--no-export-dynamic
+endif

 spoa: $(OBJS)
-       $(LD) $(LDFLAGS) -o $@ $^
+       $(LD) $(LDFLAGS) -o $@ $^ $(LDLIBS)

 install: spoa
        install spoa $(DESTDIR)$(BINDIR)
diff --git a/contrib/spoa_server/ps_lua.c b/contrib/spoa_server/ps_lua.c
new file mode 100644
index 0000000..fb0838c
--- /dev/null
+++ b/contrib/spoa_server/ps_lua.c
@@ -0,0 +1,510 @@
+/* spoa-server: processing Lua
+ *
+ * Copyright 2018 OZON / Thierry Fournier <thierry.fourn...@ozon.io>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <arpa/inet.h>
+
+#include <errno.h>
+#include <string.h>
+
+#include <lauxlib.h>
+#include <lua.h>
+#include <lualib.h>
+
+#include "spoa.h"
+
+static lua_State *L = NULL;
+static struct worker *worker;
+
+static int ps_lua_start_worker(struct worker *w);
+static int ps_lua_load_file(struct worker *w, const char *file);
+static int ps_lua_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
+
+static struct ps ps_lua_bindings_1 = {
+       .init_worker = ps_lua_start_worker,
+       .load_file = ps_lua_load_file,
+       .exec_message = ps_lua_exec_message,
+       .ext = ".lua",
+};
+
+static struct ps ps_lua_bindings_2 = {
+       .init_worker = ps_lua_start_worker,
+       .load_file = ps_lua_load_file,
+       .exec_message = ps_lua_exec_message,
+       .ext = ".luac",
+};
+
+/* Imported from Lua-5.3.4 */
+static int typeerror (lua_State *L, int arg, const char *tname)
+{
+       const char *msg;
+       const char *typearg;  /* name for the type of the actual argument */
+       if (luaL_getmetafield(L, arg, "__name") == LUA_TSTRING)
+               typearg = lua_tostring(L, -1);  /* use the given type name */
+       else if (lua_type(L, arg) == LUA_TLIGHTUSERDATA)
+               typearg = "light userdata";  /* special name for messages */
+       else
+               typearg = luaL_typename(L, arg);  /* standard name */
+       msg = lua_pushfstring(L, "%s expected, got %s", tname, typearg);
+       return luaL_argerror(L, arg, msg);
+}
+
+/* Imported from Lua-5.3.4 */
+static void tag_error (lua_State *L, int arg, int tag) {
+       typeerror(L, arg, lua_typename(L, tag));
+}
+
+#ifndef luaL_checkboolean
+static int luaL_checkboolean(lua_State *L, int index)
+{
+       if (!lua_isboolean(L, index)) {
+               tag_error(L, index, LUA_TBOOLEAN);
+       }
+       return lua_toboolean(L, index);
+}
+#endif
+
+static int ps_lua_register_message(lua_State *L)
+{
+       const char *name;
+       long ref;
+
+       /* First argument is a message name */
+       name = luaL_checkstring(L, 1);
+
+       /* Second argument is a function */
+       if (!lua_isfunction(L, 2)) {
+ const char *msg = lua_pushfstring(L, "function expected, got %s", luaL_typename(L, 2));
+               luaL_argerror(L, 2, msg);
+       }
+       lua_pushvalue(L, 2);
+       ref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+       /* Register the message processor */
+       ps_register_message(&ps_lua_bindings_1, name, (void *)ref);
+
+       return 1;
+}
+
+static int ps_lua_set_var_null(lua_State *L)
+{      
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+
+       if (!set_var_null(worker, name, name_len, scope)) {
+               luaL_error(L, "No space left available");
+       }
+       return 0;
+}
+
+static int ps_lua_set_var_boolean(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       int64_t value;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkboolean(L, 3);
+
+       if (!set_var_bool(worker, name, name_len, scope, value))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_uint32(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       int64_t value;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkinteger(L, 3);
+
+       if (value < 0 || value > UINT_MAX)
+               luaL_error(L, "Integer '%lld' out of range for 'uint32' type", 
value);
+
+       if (!set_var_uint32(worker, name, name_len, scope, value))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_int32(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       int64_t value;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkinteger(L, 3);
+
+       if (value < INT_MIN || value > INT_MAX)
+               luaL_error(L, "Integer '%lld' out of range for 'int32' type", 
value);
+
+       if (!set_var_int32(worker, name, name_len, scope, value))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_uint64(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       int64_t value;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkinteger(L, 3);
+
+       if (value < 0)
+               luaL_error(L, "Integer '%lld' out of range for 'uint64' type", 
value);
+
+       if (!set_var_uint64(worker, name, name_len, scope, value))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_int64(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       int64_t value;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkinteger(L, 3);
+
+       if (!set_var_int64(worker, name, name_len, scope, value))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_ipv4(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       const char *value;
+       struct in_addr ipv4;
+       int ret;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkstring(L, 3);
+
+       ret = inet_pton(AF_INET, value, &ipv4);
+       if (ret == 0)
+               luaL_error(L, "IPv4 '%s': invalid format", value);
+       if (ret == -1)
+               luaL_error(L, "IPv4 '%s': %s", value, strerror(errno));
+
+       if (!set_var_ipv4(worker, name, name_len, scope, &ipv4))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_ipv6(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       const char *value;
+       struct in6_addr ipv6;
+       int ret;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checkstring(L, 3);
+
+       ret = inet_pton(AF_INET6, value, &ipv6);
+       if (ret == 0)
+               luaL_error(L, "IPv6 '%s': invalid format", value);
+       if (ret == -1)
+               luaL_error(L, "IPv6 '%s': %s", value, strerror(errno));
+
+       if (!set_var_ipv6(worker, name, name_len, scope, &ipv6))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_str(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       const char *value;
+       size_t value_len;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checklstring(L, 3, &value_len);
+
+       if (!set_var_string(worker, name, name_len, scope, value, value_len))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_set_var_bin(lua_State *L)
+{
+       const char *name;
+       size_t name_len;
+       unsigned char scope;
+       const char *value;
+       size_t value_len;
+
+       name = luaL_checklstring(L, 1, &name_len);
+       scope = (unsigned char)luaL_checkinteger(L, 2);
+       value = luaL_checklstring(L, 3, &value_len);
+
+       if (!set_var_bin(worker, name, name_len, scope, value, value_len))
+               luaL_error(L, "No space left available");
+       return 0;
+}
+
+static int ps_lua_start_worker(struct worker *w)
+{
+       if (L != NULL)
+               return 1;
+
+       worker = w;
+
+       L = luaL_newstate();
+       luaL_openlibs(L);
+
+       lua_newtable(L);
+
+       lua_pushstring(L, "register_message");
+       lua_pushcclosure(L, ps_lua_register_message, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_null");
+       lua_pushcclosure(L, ps_lua_set_var_null, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_boolean");
+       lua_pushcclosure(L, ps_lua_set_var_boolean, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_uint32");
+       lua_pushcclosure(L, ps_lua_set_var_uint32, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_int32");
+       lua_pushcclosure(L, ps_lua_set_var_int32, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_uint64");
+       lua_pushcclosure(L, ps_lua_set_var_uint64, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_int64");
+       lua_pushcclosure(L, ps_lua_set_var_int64, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_ipv4");
+       lua_pushcclosure(L, ps_lua_set_var_ipv4, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_ipv6");
+       lua_pushcclosure(L, ps_lua_set_var_ipv6, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_str");
+       lua_pushcclosure(L, ps_lua_set_var_str, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "set_var_bin");
+       lua_pushcclosure(L, ps_lua_set_var_bin, 0);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "scope");
+       lua_newtable(L);
+
+       lua_pushstring(L, "proc");
+       lua_pushinteger(L, SPOE_SCOPE_PROC);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "sess");
+       lua_pushinteger(L, SPOE_SCOPE_SESS);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "txn");
+       lua_pushinteger(L, SPOE_SCOPE_TXN);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "req");
+       lua_pushinteger(L, SPOE_SCOPE_REQ);
+       lua_rawset(L, -3);
+
+       lua_pushstring(L, "res");
+       lua_pushinteger(L, SPOE_SCOPE_RES);
+       lua_rawset(L, -3);
+
+       lua_rawset(L, -3); /* scope */
+
+       lua_setglobal(L, "spoa");
+       return 1;
+}
+
+static int ps_lua_load_file(struct worker *w, const char *file)
+{
+       int error;
+
+       /* Load the file and check syntax */
+       error = luaL_loadfile(L, file);
+       if (error) {
+               fprintf(stderr, "lua syntax error: %s\n", lua_tostring(L, -1));
+               return 0;
+       }
+
+       /* If no syntax error where detected, execute the code. */
+       error = lua_pcall(L, 0, LUA_MULTRET, 0);
+   switch (error) {
+       case LUA_OK:
+               break;
+       case LUA_ERRRUN:
+               fprintf(stderr, "lua runtime error: %s\n", lua_tostring(L, -1));
+               lua_pop(L, 1);
+               return 0;
+       case LUA_ERRMEM:
+               fprintf(stderr, "lua out of memory error\n");
+               return 0;
+       case LUA_ERRERR:
+               fprintf(stderr, "lua message handler error: %s\n", 
lua_tostring(L, 0));
+               lua_pop(L, 1);
+               return 0;
+       case LUA_ERRGCMM:
+               fprintf(stderr, "lua garbage collector error: %s\n", 
lua_tostring(L, 0));
+               lua_pop(L, 1);
+               return 0;
+       default:
+               fprintf(stderr, "lua unknonwn error: %s\n", lua_tostring(L, 0));
+               lua_pop(L, 1);
+               return 0;
+       }
+       return 1;
+}
+
+static int ps_lua_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args)
+{
+       long lua_ref = (long)ref;
+       int ret;
+       char *msg_fmt = NULL;
+       const char *msg;
+       int i;
+       char ipbuf[64];
+
+       /* Restore function in the stack */
+       lua_rawgeti(L, LUA_REGISTRYINDEX, lua_ref);
+
+       /* convert args in lua mode */
+       lua_newtable(L);
+       for (i = 0; i < nargs; i++) {
+               lua_newtable(L);
+               lua_pushstring(L, "name");
+               lua_pushlstring(L, args[i].name.str, args[i].name.len);
+               lua_rawset(L, -3); /* Push name */
+               lua_pushstring(L, "value");
+               switch (args[i].value.type) {
+               case SPOE_DATA_T_NULL:
+                       lua_pushnil(L);
+                       break;
+               case SPOE_DATA_T_BOOL:
+                       lua_pushboolean(L, args[i].value.u.boolean);
+                       break;
+               case SPOE_DATA_T_INT32:
+                       lua_pushinteger(L, args[i].value.u.sint32);
+                       break;
+               case SPOE_DATA_T_UINT32:
+                       lua_pushinteger(L, args[i].value.u.uint32);
+                       break;
+               case SPOE_DATA_T_INT64:
+                       lua_pushinteger(L, args[i].value.u.sint64);
+                       break;
+               case SPOE_DATA_T_UINT64:
+                       if (args[i].value.u.uint64 > LLONG_MAX)
+                               lua_pushnil(L);
+                       else
+                               lua_pushinteger(L, args[i].value.u.uint64);
+                       break;
+               case SPOE_DATA_T_IPV4:
+                       if (inet_ntop(AF_INET, &args[i].value.u.ipv4, ipbuf, 
64) == NULL)
+                               lua_pushnil(L);
+                       else
+                               lua_pushstring(L, ipbuf);
+                       break;
+               case SPOE_DATA_T_IPV6:
+                       if (inet_ntop(AF_INET6, &args[i].value.u.ipv4, ipbuf, 
64) == NULL)
+                               lua_pushnil(L);
+                       else
+                               lua_pushstring(L, ipbuf);
+                       break;
+               case SPOE_DATA_T_STR:
+               case SPOE_DATA_T_BIN:
+                       lua_pushlstring(L, args[i].value.u.buffer.str, 
args[i].value.u.buffer.len);
+                       break;
+               default:
+                       lua_pushnil(L);
+                       break;
+               }
+               lua_rawset(L, -3); /* Push name */
+               lua_rawseti(L, -2, i + 1); /* Pusg table in globale table */
+       }
+
+       /* execute lua function */
+       while (1) {
+               ret = lua_resume(L, L, 1);
+               switch (ret) {
+               case LUA_OK:
+                       return 1;
+               case LUA_YIELD:
+                       DEBUG("Lua yield");
+                       continue;
+               case LUA_ERRMEM:
+                       LOG("Lua: Out of memory error");
+                       return 0;
+               case LUA_ERRRUN:
+                       msg_fmt = "Lua runtime error";
+               case LUA_ERRGCMM:
+                       msg_fmt = msg_fmt ? msg_fmt : "Lua garbage collector 
error";
+               case LUA_ERRERR:
+                       msg_fmt = msg_fmt ? msg_fmt : "Lua message handler 
error";
+               default:
+                       msg_fmt = msg_fmt ? msg_fmt : "Lua unknonwn error";
+                       msg = lua_tostring(L, -1);
+                       if (msg == NULL)
+                               msg = "Unknown error";
+                       LOG("%s: %s", msg_fmt, msg);
+                       lua_settop(L, 0);
+                       return 0;
+               }
+       }
+
+       return 1;
+}
+
+__attribute__((constructor))
+static void __ps_lua_init(void)
+{
+       ps_register(&ps_lua_bindings_1);
+       ps_register(&ps_lua_bindings_2);
+}
+
--
2.9.5


>From be4e8b87c36e7f18644cb409c1de8b82e5df9aeb Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Sun, 25 Feb 2018 20:59:57 +0100
Subject: [PATCH 13/14] MINOR: spoa-server: Add python

This commit adds the Python support for the server.
---
 contrib/spoa_server/Makefile    |   6 +
 contrib/spoa_server/ps_python.c | 645 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 651 insertions(+)
 create mode 100644 contrib/spoa_server/ps_python.c

diff --git a/contrib/spoa_server/Makefile b/contrib/spoa_server/Makefile
index 31e3de5..f075282 100644
--- a/contrib/spoa_server/Makefile
+++ b/contrib/spoa_server/Makefile
@@ -21,6 +21,12 @@ endif
 LDLIBS += -ldl -Wl,--export-dynamic -llua -lm -Wl,--no-export-dynamic
 endif

+ifneq ($(USE_PYTHON),)
+OBJS += ps_python.o
+CFLAGS += -I/usr/include/python2.7
+LDLIBS += -lpython2.7
+endif
+
 spoa: $(OBJS)
        $(LD) $(LDFLAGS) -o $@ $^ $(LDLIBS)

diff --git a/contrib/spoa_server/ps_python.c b/contrib/spoa_server/ps_python.c
new file mode 100644
index 0000000..22bfcb8
--- /dev/null
+++ b/contrib/spoa_server/ps_python.c
@@ -0,0 +1,645 @@
+/* spoa-server: processing Python
+ *
+ * Copyright 2018 OZON / Thierry Fournier <thierry.fourn...@ozon.io>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <Python.h>
+
+#include <arpa/inet.h>
+
+#include <errno.h>
+#include <string.h>
+
+#include "spoa.h"
+
+/* Embedding python documentation:
+ *
+ * https://docs.python.org/2/extending/embedding.html
+ * https://docs.python.org/2/extending/extending.html#extending-python-with-c-or-c + * https://docs.python.org/2/extending/extending.html#calling-python-functions-from-c
+ */
+
+static PyObject *module_ipaddress;
+static PyObject *ipv4_address;
+static PyObject *ipv6_address;
+static PyObject *spoa_error;
+static PyObject *empty_array;
+static struct worker *worker;
+
+static int ps_python_start_worker(struct worker *w);
+static int ps_python_load_file(struct worker *w, const char *file);
+static int ps_python_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
+
+static struct ps ps_python_bindings = {
+       .init_worker = ps_python_start_worker,
+       .load_file = ps_python_load_file,
+       .exec_message = ps_python_exec_message,
+       .ext = ".py",
+};
+
+static PyObject *ps_python_register_message(PyObject *self, PyObject *args)
+{
+       const char *name;
+       PyObject *ref;
+
+       if (!PyArg_ParseTuple(args, "sO!", &name, &PyFunction_Type, &ref))
+               return NULL;
+       Py_XINCREF(ref); /* because the function is intenally refrenced */
+
+       ps_register_message(&ps_python_bindings, name, (void *)ref);
+
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_null(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+
+       if (!PyArg_ParseTuple(args, "s#i", &name, &name_len, &scope))
+               return NULL;
+       if (!set_var_null(worker, name, name_len, scope)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_boolean(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       int value;
+
+       if (!PyArg_ParseTuple(args, "s#ii", &name, &name_len, &scope, &value))
+               return NULL;
+       if (!set_var_bool(worker, name, name_len, scope, value)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_int32(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       int32_t value;
+
+       if (!PyArg_ParseTuple(args, "s#ii", &name, &name_len, &scope, &value))
+               return NULL;
+       if (!set_var_int32(worker, name, name_len, scope, value)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_uint32(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       uint32_t value;
+
+       if (!PyArg_ParseTuple(args, "s#iI", &name, &name_len, &scope, &value))
+               return NULL;
+       if (!set_var_uint32(worker, name, name_len, scope, value)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_int64(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       int64_t value;
+
+       if (!PyArg_ParseTuple(args, "s#il", &name, &name_len, &scope, &value))
+               return NULL;
+       if (!set_var_int64(worker, name, name_len, scope, value)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_uint64(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       uint64_t value;
+
+       if (!PyArg_ParseTuple(args, "s#ik", &name, &name_len, &scope, &value))
+               return NULL;
+       if (!set_var_uint64(worker, name, name_len, scope, value)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_ipv4(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       PyObject *ipv4;
+       PyObject *value;
+       struct in_addr ip;
+
+       if (!PyArg_ParseTuple(args, "s#iO", &name, &name_len, &scope, &ipv4))
+               return NULL;
+       if (!PyObject_IsInstance(ipv4, ipv4_address)) {
+ PyErr_Format(spoa_error, "must be 'IPv4Address', not '%s'", ipv4->ob_type->tp_name);
+               return NULL;
+       }
+       /* Execute packed ... I think .. */
+       value = PyObject_GetAttrString(ipv4, "packed");
+       if (value == NULL)
+               return NULL;
+       if (PyString_GET_SIZE(value) != sizeof(ip)) {
+               PyErr_Format(spoa_error, "UPv6 manipulation internal error");
+               return NULL;
+       }
+       memcpy(&ip, PyString_AS_STRING(value), PyString_GET_SIZE(value));
+       if (!set_var_ipv4(worker, name, name_len, scope, &ip)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_ipv6(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       PyObject *ipv6;
+       PyObject *value;
+       struct in6_addr ip;
+
+       if (!PyArg_ParseTuple(args, "s#iO", &name, &name_len, &scope, &ipv6))
+               return NULL;
+       if (!PyObject_IsInstance(ipv6, ipv6_address)) {
+ PyErr_Format(spoa_error, "must be 'IPv6Address', not '%s'", ipv6->ob_type->tp_name);
+               return NULL;
+       }
+       /* Execute packed ... I think .. */
+       value = PyObject_GetAttrString(ipv6, "packed");
+       if (value == NULL)
+               return NULL;
+       if (PyString_GET_SIZE(value) != sizeof(ip)) {
+               PyErr_Format(spoa_error, "UPv6 manipulation internal error");
+               return NULL;
+       }
+       memcpy(&ip, PyString_AS_STRING(value), PyString_GET_SIZE(value));
+       if (!set_var_ipv6(worker, name, name_len, scope, &ip)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_str(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       const char *value;
+       int value_len;
+
+ if (!PyArg_ParseTuple(args, "s#is#", &name, &name_len, &scope, &value, &value_len))
+               return NULL;
+       if (!set_var_string(worker, name, name_len, scope, value, value_len)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+static PyObject *ps_python_set_var_bin(PyObject *self, PyObject *args)
+{
+       const char *name;
+       int name_len;
+       int scope;
+       const char *value;
+       int value_len;
+
+ if (!PyArg_ParseTuple(args, "s#is#", &name, &name_len, &scope, &value, &value_len))
+               return NULL;
+       if (!set_var_bin(worker, name, name_len, scope, value, value_len)) {
+               PyErr_SetString(spoa_error, "No space left available");
+               return NULL;
+       }
+       return Py_None;
+}
+
+
+static PyMethodDef spoa_methods[] = {
+       {"register_message", ps_python_register_message, METH_VARARGS,
+        "Register binding for SPOA message."},
+       {"set_var_null", ps_python_set_var_null, METH_VARARGS,
+        "Set SPOA NULL variable"},
+       {"set_var_boolean", ps_python_set_var_boolean, METH_VARARGS,
+        "Set SPOA boolean variable"},
+       {"set_var_int32", ps_python_set_var_int32, METH_VARARGS,
+        "Set SPOA int32 variable"},
+       {"set_var_uint32", ps_python_set_var_uint32, METH_VARARGS,
+        "Set SPOA uint32 variable"},
+       {"set_var_int64", ps_python_set_var_int64, METH_VARARGS,
+        "Set SPOA int64 variable"},
+       {"set_var_uint64", ps_python_set_var_uint64, METH_VARARGS,
+        "Set SPOA uint64 variable"},
+       {"set_var_ipv4", ps_python_set_var_ipv4, METH_VARARGS,
+        "Set SPOA ipv4 variable"},
+       {"set_var_ipv6", ps_python_set_var_ipv6, METH_VARARGS,
+        "Set SPOA ipv6 variable"},
+       {"set_var_str", ps_python_set_var_str, METH_VARARGS,
+        "Set SPOA str variable"},
+       {"set_var_bin", ps_python_set_var_bin, METH_VARARGS,
+        "Set SPOA bin variable"},
+       { /* end */ }
+};
+
+static int ps_python_start_worker(struct worker *w)
+{
+       PyObject *m;
+       PyObject *module_name;
+       PyObject *value;
+       int ret;
+
+       Py_SetProgramName("spoa-server");
+       Py_Initialize();
+
+       module_name = PyString_FromString("ipaddress");
+       if (module_name == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       module_ipaddress = PyImport_Import(module_name);
+       Py_DECREF(module_name);
+       if (module_ipaddress == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ipv4_address = PyObject_GetAttrString(module_ipaddress, "IPv4Address");
+       if (ipv4_address == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ipv6_address = PyObject_GetAttrString(module_ipaddress, "IPv6Address");
+       if (ipv4_address == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       m = Py_InitModule("spoa", spoa_methods);
+       if (m == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       spoa_error = PyErr_NewException("spoa.error", NULL, NULL);
+       Py_INCREF(spoa_error);
+       PyModule_AddObject(m, "error", spoa_error);
+
+
+       value = PyLong_FromLong(SPOE_SCOPE_PROC);
+       if (value == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyModule_AddObject(m, "scope_proc", value);
+       if (ret == -1) {
+               PyErr_Print();
+               return 0;
+       }
+
+       value = PyLong_FromLong(SPOE_SCOPE_SESS);
+       if (value == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyModule_AddObject(m, "scope_sess", value);
+       if (ret == -1) {
+               PyErr_Print();
+               return 0;
+       }
+
+       value = PyLong_FromLong(SPOE_SCOPE_TXN);
+       if (value == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyModule_AddObject(m, "scope_txn", value);
+       if (ret == -1) {
+               PyErr_Print();
+               return 0;
+       }
+
+       value = PyLong_FromLong(SPOE_SCOPE_REQ);
+       if (value == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyModule_AddObject(m, "scope_req", value);
+       if (ret == -1) {
+               PyErr_Print();
+               return 0;
+       }
+
+       value = PyLong_FromLong(SPOE_SCOPE_RES);
+       if (value == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyModule_AddObject(m, "scope_res", value);
+       if (ret == -1) {
+               PyErr_Print();
+               return 0;
+       }
+
+       empty_array = PyDict_New();
+       if (empty_array == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       worker = w;
+       return 1;
+}
+
+static int ps_python_load_file(struct worker *w, const char *file)
+{
+       FILE *fp;
+       int ret;
+
+       fp = fopen(file, "r");
+       if (fp == NULL) {
+               LOG("python: Cannot read file \"%s\": %s", file, 
strerror(errno));
+               return 0;
+       }
+
+       ret = PyRun_SimpleFile(fp, file);
+       fclose(fp);
+       if (ret != 0) {
+               PyErr_Print();
+               return 0;
+       }
+
+       return 1;
+}
+
+static int ps_python_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args)
+{
+       int i;
+       PyObject *python_ref = ref;
+       PyObject *fkw;
+       PyObject *kw_args;
+       PyObject *result;
+       PyObject *ent;
+       PyObject *key;
+       PyObject *value;
+       PyObject *func;
+       int ret;
+       char ipbuf[64];
+       const char *p;
+       PyObject *ip_dict;
+       PyObject *ip_name;
+       PyObject *ip_value;
+
+       /* Dict containing arguments */
+
+       kw_args = PyList_New(0);
+       if (kw_args == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       for (i = 0; i < nargs; i++) {
+
+               /* New dict containing one argument */
+
+               ent = PyDict_New();
+               if (ent == NULL) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               /* Create the name entry */
+
+               key = PyString_FromString("name");
+               if (key == NULL) {
+                       Py_DECREF(kw_args);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               value = PyString_FromStringAndSize(args[i].name.str, 
args[i].name.len);
+               if (value == NULL) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       Py_DECREF(key);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               ret = PyDict_SetItem(ent, key, value);
+               Py_DECREF(key);
+               Py_DECREF(value);
+               if (ret == -1) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               /* Create th value entry */
+
+               key = PyString_FromString("value");
+               if (key == NULL) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               switch (args[i].value.type) {
+               case SPOE_DATA_T_NULL:
+                       value = Py_None;
+                       break;
+               case SPOE_DATA_T_BOOL:
+                       value = PyBool_FromLong(args[i].value.u.boolean);
+                       break;
+               case SPOE_DATA_T_INT32:
+                       value = PyLong_FromLong(args[i].value.u.sint32);
+                       break;
+               case SPOE_DATA_T_UINT32:
+                       value = PyLong_FromLong(args[i].value.u.uint32);
+                       break;
+               case SPOE_DATA_T_INT64:
+                       value = PyLong_FromLong(args[i].value.u.sint64);
+                       break;
+               case SPOE_DATA_T_UINT64:
+                       value = PyLong_FromUnsignedLong(args[i].value.u.uint64);
+                       break;
+               case SPOE_DATA_T_IPV4:
+               case SPOE_DATA_T_IPV6:
+                       if (args[i].value.type == SPOE_DATA_T_IPV4)
+                               p = inet_ntop(AF_INET, &args[i].value.u.ipv4, 
ipbuf, 64);
+                       else
+                               p = inet_ntop(AF_INET6, &args[i].value.u.ipv6, 
ipbuf, 64);
+                       if (!p)
+                               strcpy(ipbuf, "0.0.0.0");
+
+                       func = PyObject_GetAttrString(module_ipaddress, 
"ip_address");
+                       if (func == NULL) {
+                               Py_DECREF(kw_args);
+                               Py_DECREF(ent);
+                               PyErr_Print();
+                               return 0;
+                       }
+                       ip_dict = PyDict_New();
+                       if (ip_dict == NULL) {
+                               Py_DECREF(kw_args);
+                               Py_DECREF(ent);
+                               Py_DECREF(func);
+                               PyErr_Print();
+                               return 0;
+                       }
+                       ip_name = PyString_FromString("address");
+                       if (ip_name == NULL) {
+                               Py_DECREF(kw_args);
+                               Py_DECREF(ent);
+                               Py_DECREF(func);
+                               Py_DECREF(ip_dict);
+                               PyErr_Print();
+                               return 0;
+                       }
+                       ip_value = PyUnicode_FromString(ipbuf);
+                       if (ip_value == NULL) {
+                               Py_DECREF(kw_args);
+                               Py_DECREF(ent);
+                               Py_DECREF(func);
+                               Py_DECREF(ip_dict);
+                               Py_DECREF(ip_name);
+                               PyErr_Print();
+                               return 0;
+                       }
+                       ret = PyDict_SetItem(ip_dict, ip_name, ip_value);
+                       Py_DECREF(ip_name);
+                       Py_DECREF(ip_value);
+                       if (ret == -1) {
+                               Py_DECREF(ip_dict);
+                               PyErr_Print();
+                               return 0;
+                       }
+                       value = PyObject_Call(func, empty_array, ip_dict);
+                       Py_DECREF(func);
+                       Py_DECREF(ip_dict);
+                       break;
+
+               case SPOE_DATA_T_STR:
+ value = PyString_FromStringAndSize(args[i].value.u.buffer.str, args[i].value.u.buffer.len);
+                       break;
+               case SPOE_DATA_T_BIN:
+ value = PyString_FromStringAndSize(args[i].value.u.buffer.str, args[i].value.u.buffer.len);
+                       break;
+               default:
+                       value = Py_None;
+                       break;
+               }
+               if (value == NULL) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       Py_DECREF(key);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               ret = PyDict_SetItem(ent, key, value);
+               Py_DECREF(key);
+               Py_DECREF(value);
+               if (ret == -1) {
+                       Py_DECREF(kw_args);
+                       Py_DECREF(ent);
+                       PyErr_Print();
+                       return 0;
+               }
+
+               /* Add dict to the list */
+
+               ret = PyList_Append(kw_args, ent);
+               Py_DECREF(ent);
+               if (ret == -1) {
+                       Py_DECREF(kw_args);
+                       PyErr_Print();
+                       return 0;
+               }
+       }
+
+       /* Dictionnary { args = <list-of-args> } for the function */
+
+       fkw = PyDict_New();
+       if (fkw == NULL) {
+               Py_DECREF(kw_args);
+               PyErr_Print();
+               return 0;
+       }
+
+       key = PyString_FromString("args");
+       if (key == NULL) {
+               Py_DECREF(kw_args);
+               Py_DECREF(fkw);
+               PyErr_Print();
+               return 0;
+       }
+
+       ret = PyDict_SetItem(fkw, key, kw_args);
+       Py_DECREF(kw_args);
+       Py_DECREF(key);
+       if (ret == -1) {
+               Py_DECREF(fkw);
+               PyErr_Print();
+               return 0;
+       }
+
+       result = PyObject_Call(python_ref, empty_array, fkw);
+       if (result == NULL) {
+               PyErr_Print();
+               return 0;
+       }
+
+       return 1;
+}
+
+__attribute__((constructor))
+static void __ps_python_init(void)
+{
+       ps_register(&ps_python_bindings);
+}
+
--
2.9.5


>From 075cd6e1fce53f2c89c1e3abeba25ec61aab19e6 Mon Sep 17 00:00:00 2001
From: Thierry FOURNIER <thierry.fourn...@ozon.io>
Date: Sun, 25 Feb 2018 21:28:05 +0100
Subject: [PATCH 14/14] MINOR/DOC: spoe-server: Add documentation

This is the documentation and examples.
---
 contrib/spoa_server/README                | 101 +++++++++++++-----------------
 contrib/spoa_server/print_r.lua           |  68 ++++++++++++++++++++
 contrib/spoa_server/ps_lua.lua            |  17 +++++
 contrib/spoa_server/ps_python.py          |  20 ++++++
 contrib/spoa_server/spoa-server.conf      |  33 ++++++++++
 contrib/spoa_server/spoa-server.spoe.conf |  13 ++++
 6 files changed, 194 insertions(+), 58 deletions(-)
 create mode 100644 contrib/spoa_server/print_r.lua
 create mode 100644 contrib/spoa_server/ps_lua.lua
 create mode 100644 contrib/spoa_server/ps_python.py
 create mode 100644 contrib/spoa_server/spoa-server.conf
 create mode 100644 contrib/spoa_server/spoa-server.spoe.conf

diff --git a/contrib/spoa_server/README b/contrib/spoa_server/README
index 7e376ee..57ec9c4 100644
--- a/contrib/spoa_server/README
+++ b/contrib/spoa_server/README
@@ -1,10 +1,18 @@
-A Random IP reputation service acting as a Stream Processing Offload Agent
---------------------------------------------------------------------------
+Multi script langyage Stream Processing Offload Agent
+-----------------------------------------------------

-This is a very simple service that implement a "random" ip reputation
-service. It will return random scores for all checked IP addresses. It only
-shows you how to implement a ip reputation service or such kind of services
-using the SPOE.
+This agent receive SPOP message and process it with script languages. The
+language register callback with a message. Each callback receive the list
+of arguments with types according with the language capabilities. The
+callback write variables which are sent as response when the processing
+is done.
+
+
+  Compilation
+---------------
+
+Actually, the server support Lua and Python. Type "make" with the options:
+USE_LUA=1 and/or USE_PYTHON=1.


   Start the service
@@ -19,70 +27,47 @@ binary:
         -d                  Enable the debug mode
         -p <port>           Specify the port to listen on (default: 12345)
         -n <num-workers>    Specify the number of workers (default: 5)
+        -f <file>           Load script according with the supported languages

-Note: A worker is a thread.
-
-
-  Configure a SPOE to use the service
----------------------------------------
-
-All information about SPOE configuration can be found in "doc/SPOE.txt". Here is
-the configuration template to use for your SPOE:
-
-    [ip-reputation]
-
-    spoe-agent iprep-agent
-        messages check-client-ip
-
-        option var-prefix iprep
-
-        timeout hello      100ms
-        timeout idle       30s
-        timeout processing 15ms
-
-        use-backend iprep-backend
+The file processor is recognized using the extension. .lua or .luac for lua and
+.py for python. Start example:

-    spoe-message check-client-ip
-        args src
-        event on-client-session
+    $> ./spoa -d -f ps_lua.lua

+        $> ./spoa -d -f ps_pyhton.py

-The engine is in the scope "ip-reputation". So to enable it, you must set the
-following line in a frontend/listener section:

-    frontend my-front
-        ...
-        filter spoe engine ip-reputation config /path/spoe-ip-reputation.conf
-       ....
+  Configure
+-------------

-where "/path/spoe-ip-reputation.conf" is the path to your SPOE configuration
-file. The engine name is important here, it must be the same than the one used
-in the SPOE configuration file.
+Sample configuration are join to this server:

-IMPORTANT NOTE:
-    Because we want to send a message on the "on-client-session" event, this
-    SPOE must be attached to a proxy with the frontend capability. If it is
-    declared in a backend section, it will have no effet.
+  spoa-server.conf      : The HAProxy configuration file using SPOE server
+  spoa-server.spoe.conf : The SPOP description file used by HAProxy
+  ps_lua.lua            : Processing Lua example
+  ps_python.py          : Processing Python example


-Because, in SPOE configuration file, we declare to use the backend
-"iprep-backend" to communicate with the service, you must define it in HAProxy
-configuration. For example:
+  Considerations
+------------------

-    backend iprep-backend
-        mode tcp
-       timeout server 1m
-       server iprep-srv 127.0.0.1:12345 check maxconn 5
+This server is a beta version. It works fine, but some improvement will be
+welcome:

+Main process:

-In reply to the "check-client-ip" message, this service will set the variable
-"ip_score" for the session, an integer between 0 and 100. If unchanged, the
-variable prefix is "iprep". So the full variable name will be
-"sess.iprep.ip_score".
+ * Improve log management: Today the log are sent on stdout.
+ * Improve process management: The dead process are ignored.
+ * Implement systemd integration.
+ * Implement threads: It would be fine to implement thread working. Shared
+   memory is welcome for managing database connection pool and something like
+   that.
+ * Add PHP support and some other languages.

-You can use it in ACLs to experiment the SPOE feature. For example:
+Python:

-    tcp-request content reject if { var(sess.iprep.ip_score) -m int lt 20 }
+ * Improve repporting: Catch python error message and repport it in the right
+   place. Today the error are dumped on stdout. How using syslog for logging
+   stack traces ?

-With this rule, all IP address with a score lower than 20 will be rejected
-(Remember, this score is random).
+Maybe some other things...
diff --git a/contrib/spoa_server/print_r.lua b/contrib/spoa_server/print_r.lua
new file mode 100644
index 0000000..2fa57e7
--- /dev/null
+++ b/contrib/spoa_server/print_r.lua
@@ -0,0 +1,68 @@
+function color(index, str)
+       return "\x1b[" .. index .. "m" .. str .. "\x1b[00m"
+end
+
+function nocolor(index, str)
+       return str
+end
+
+function sp(count)
+       local spaces = ""
+       while count > 0 do
+               spaces = spaces .. "    "
+               count = count - 1
+       end
+       return spaces
+end
+
+function print_rr(p, indent, c, wr)
+       local i = 0
+       local nl = ""
+
+       if type(p) == "table" then
+               wr(c("33", "(table)") .. " " .. c("34", tostring(p)) .. " [")
+
+               mt = getmetatable(p)
+               if mt ~= nil then
+                       wr("\n" .. sp(indent+1) .. c("31", "METATABLE") .. ": ")
+                       print_rr(mt, indent+1, c, wr)
+               end
+
+               for k,v in pairs(p) do
+                       if i > 0 then
+                               nl = "\n"
+                       else
+                               wr("\n")
+                       end
+                       wr(nl .. sp(indent+1))
+                       if type(k) == "number" then
+                               wr(c("32", tostring(k)))
+                       else
+                               wr("\"" .. c("32", tostring(k)) .. "\"")
+                       end
+                       wr(": ")
+                       print_rr(v, indent+1, c, wr)
+                       i = i + 1
+               end
+               if i == 0 then
+                       wr(" " .. c("35", "/* empty */") .. " ]")
+               else
+                       wr("\n" .. sp(indent) .. "]")
+               end
+       elseif type(p) == "string" then
+               wr(c("33", "(string)") .. " \"" .. c("34", p) .. "\"")
+       else
+               wr(c("33", "(" .. type(p) .. ")") .. " " .. c("34", 
tostring(p)))
+       end
+end
+
+function print_r(p, col, wr)
+       if col == nil then col = true end
+       if wr == nil then wr = function(msg) io.stdout:write(msg) end end
+       if col == true then
+               print_rr(p, 0, color, wr)
+       else
+               print_rr(p, 0, nocolor, wr)
+       end
+       wr("\n")
+end
diff --git a/contrib/spoa_server/ps_lua.lua b/contrib/spoa_server/ps_lua.lua
new file mode 100644
index 0000000..2662045
--- /dev/null
+++ b/contrib/spoa_server/ps_lua.lua
@@ -0,0 +1,17 @@
+require("print_r")
+
+print_r("Load lua message processors")
+
+spoa.register_message("check-client-ip", function(args)
+       print_r(args)
+       spoa.set_var_null("null", spoa.scope.txn)
+       spoa.set_var_boolean("boolean", spoa.scope.txn, true)
+       spoa.set_var_int32("int32", spoa.scope.txn, 1234)
+       spoa.set_var_uint32("uint32", spoa.scope.txn, 1234)
+       spoa.set_var_int64("int64", spoa.scope.txn, 1234)
+       spoa.set_var_uint64("uint64", spoa.scope.txn, 1234)
+       spoa.set_var_ipv4("ipv4", spoa.scope.txn, "127.0.0.1")
+       spoa.set_var_ipv6("ipv6", spoa.scope.txn, "1::f")
+       spoa.set_var_str("str", spoa.scope.txn, "1::f")
+       spoa.set_var_bin("bin", spoa.scope.txn, "1::f")
+end)
diff --git a/contrib/spoa_server/ps_python.py b/contrib/spoa_server/ps_python.py
new file mode 100644
index 0000000..108eb48
--- /dev/null
+++ b/contrib/spoa_server/ps_python.py
@@ -0,0 +1,20 @@
+from pprint import pprint
+import spoa
+import ipaddress
+
+def check_client_ip(args):
+       pprint(args)
+       spoa.set_var_null("null", spoa.scope_txn)
+       spoa.set_var_boolean("boolean", spoa.scope_txn, True)
+       spoa.set_var_int32("int32", spoa.scope_txn, 1234)
+       spoa.set_var_uint32("uint32", spoa.scope_txn, 1234)
+       spoa.set_var_int64("int64", spoa.scope_txn, 1234)
+       spoa.set_var_uint64("uint64", spoa.scope_txn, 1234)
+ spoa.set_var_ipv4("ipv4", spoa.scope_txn, ipaddress.IPv4Address(u"127.0.0.1"))
+       spoa.set_var_ipv6("ipv6", spoa.scope_txn, 
ipaddress.IPv6Address(u"1::f"))
+       spoa.set_var_str("str", spoa.scope_txn, "1::f")
+       spoa.set_var_bin("bin", spoa.scope_txn, "1:\x01:\x02f\x00\x00")
+       return
+
+
+spoa.register_message("check-client-ip", check_client_ip)
diff --git a/contrib/spoa_server/spoa-server.conf b/contrib/spoa_server/spoa-server.conf
new file mode 100644
index 0000000..13bd126
--- /dev/null
+++ b/contrib/spoa_server/spoa-server.conf
@@ -0,0 +1,33 @@
+global
+       debug
+
+defaults
+       mode http
+       option httplog
+       option dontlognull
+       timeout connect 5000
+       timeout client 5000
+       timeout server 5000
+
+listen test
+       mode http
+       bind :10001
+       filter spoe engine spoa-server config spoa-server.spoe.conf
+       http-request set-var(req.a) var(txn.iprep.null),debug
+       http-request set-var(req.a) var(txn.iprep.boolean),debug
+       http-request set-var(req.a) var(txn.iprep.int32),debug
+       http-request set-var(req.a) var(txn.iprep.uint32),debug
+       http-request set-var(req.a) var(txn.iprep.int64),debug
+       http-request set-var(req.a) var(txn.iprep.uint64),debug
+       http-request set-var(req.a) var(txn.iprep.ipv4),debug
+       http-request set-var(req.a) var(txn.iprep.ipv6),debug
+       http-request set-var(req.a) var(txn.iprep.str),debug
+       http-request set-var(req.a) var(txn.iprep.bin),debug
+       http-request redirect location /%[var(sess.iprep.ip_score)]
+
+backend spoe-server
+       mode tcp
+       balance roundrobin
+       timeout connect 5s
+       timeout server  3m
+       server spoe-server 127.0.0.1:12345
diff --git a/contrib/spoa_server/spoa-server.spoe.conf b/contrib/spoa_server/spoa-server.spoe.conf
new file mode 100644
index 0000000..dab4e5a
--- /dev/null
+++ b/contrib/spoa_server/spoa-server.spoe.conf
@@ -0,0 +1,13 @@
+[spoa-server]
+
+spoe-agent spoa-server
+       messages check-client-ip
+       option var-prefix  iprep
+       timeout hello      100ms
+       timeout idle       30s
+       timeout processing 15ms
+       use-backend spoe-server
+
+spoe-message check-client-ip
+       args always_true int(1234) src ipv6(::55) req.fhdr(host)
+       event on-frontend-http-request
--
2.9.5





Reply via email to