http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/transport/transport.h ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.h b/proton-c/src/transport/transport.h deleted file mode 100644 index 66ebc51..0000000 --- a/proton-c/src/transport/transport.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef _PROTON_TRANSPORT_INTERNAL_H -#define _PROTON_TRANSPORT_INTERNAL_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next); -void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery); -void pn_delivery_map_free(pn_delivery_map_t *db); -void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link); -void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn); - -#endif /* transport.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/types.c ---------------------------------------------------------------------- diff --git a/proton-c/src/types.c b/proton-c/src/types.c deleted file mode 100644 index 4f8048d..0000000 --- a/proton-c/src/types.c +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "platform.h" -#include <proton/types.h> -#include <stdlib.h> -#include <string.h> - -pn_bytes_t pn_bytes(size_t size, const char *start) -{ - pn_bytes_t bytes = {size, start}; - return bytes; -} - -pn_rwbytes_t pn_rwbytes(size_t size, char *start) -{ - pn_rwbytes_t bytes = {size, start}; - return bytes; -} - -pn_timestamp_t pn_timestamp_now() { - return pn_i_now(); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/types.xml ---------------------------------------------------------------------- diff --git a/proton-c/src/types.xml b/proton-c/src/types.xml new file mode 100644 index 0000000..4aa9c0f --- /dev/null +++ b/proton-c/src/types.xml @@ -0,0 +1,125 @@ +<?xml version="1.0"?> + +<!-- +Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit +Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL +Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited, +Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A, +Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc., +Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process +Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. +3. The name of the author may not be used to endorse or promote products +derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +--> + +<amqp name="types" xmlns="http://www.amqp.org/schema/amqp.xsd"> + <section name="encodings"> + <type name="null" class="primitive"> + <encoding code="0x40" category="fixed" width="0"/> + </type> + <type name="boolean" class="primitive"> + <encoding code="0x56" category="fixed" width="1"/> + <encoding name="true" code="0x41" category="fixed" width="0"/> + <encoding name="false" code="0x42" category="fixed" width="0"/> + </type> + <type name="ubyte" class="primitive"> + <encoding code="0x50" category="fixed" width="1"/> + </type> + <type name="ushort" class="primitive"> + <encoding code="0x60" category="fixed" width="2"/> + </type> + <type name="uint" class="primitive"> + <encoding code="0x70" category="fixed" width="4"/> + <encoding name="smalluint" code="0x52" category="fixed" width="1"/> + <encoding name="uint0" code="0x43" category="fixed" width="0"/> + </type> + <type name="ulong" class="primitive"> + <encoding code="0x80" category="fixed" width="8"/> + <encoding name="smallulong" code="0x53" category="fixed" width="1"/> + <encoding name="ulong0" code="0x44" category="fixed" width="0"/> + </type> + <type name="byte" class="primitive"> + <encoding code="0x51" category="fixed" width="1"/> + </type> + <type name="short" class="primitive"> + <encoding code="0x61" category="fixed" width="2"/> + </type> + <type name="int" class="primitive"> + <encoding code="0x71" category="fixed" width="4"/> + <encoding name="smallint" code="0x54" category="fixed" width="1"/> + </type> + <type name="long" class="primitive"> + <encoding code="0x81" category="fixed" width="8"/> + <encoding name="smalllong" code="0x55" category="fixed" width="1"/> + </type> + <type name="float" class="primitive"> + <encoding name="ieee-754" code="0x72" category="fixed" width="4"/> + </type> + <type name="double" class="primitive"> + <encoding name="ieee-754" code="0x82" category="fixed" width="8"/> + </type> + <type name="decimal32" class="primitive"> + <encoding name="ieee-754" code="0x74" category="fixed" width="4"/> + </type> + <type name="decimal64" class="primitive"> + <encoding name="ieee-754" code="0x84" category="fixed" width="8"/> + </type> + <type name="decimal128" class="primitive"> + <encoding name="ieee-754" code="0x94" category="fixed" width="16"/> + </type> + <type name="char" class="primitive"> + <encoding name="utf32" code="0x73" category="fixed" width="4"/> + </type> + <type name="timestamp" class="primitive"> + <encoding name="ms64" code="0x83" category="fixed" width="8"/> + </type> + <type name="uuid" class="primitive"> + <encoding code="0x98" category="fixed" width="16"/> + </type> + <type name="binary" class="primitive"> + <encoding name="vbin8" code="0xa0" category="variable" width="1"/> + <encoding name="vbin32" code="0xb0" category="variable" width="4"/> + </type> + <type name="string" class="primitive"> + <encoding name="str8-utf8" code="0xa1" category="variable" width="1"/> + <encoding name="str32-utf8" code="0xb1" category="variable" width="4"/> + </type> + <type name="symbol" class="primitive"> + <encoding name="sym8" code="0xa3" category="variable" width="1"/> + <encoding name="sym32" code="0xb3" category="variable" width="4"/> + </type> + <type name="list" class="primitive"> + <encoding name="list0" code="0x45" category="fixed" width="0"/> + <encoding name="list8" code="0xc0" category="compound" width="1"/> + <encoding name="list32" code="0xd0" category="compound" width="4"/> + </type> + <type name="map" class="primitive"> + <encoding name="map8" code="0xc1" category="compound" width="1"/> + <encoding name="map32" code="0xd1" category="compound" width="4"/> + </type> + <type name="array" class="primitive"> + <encoding name="array8" code="0xe0" category="array" width="1"/> + <encoding name="array32" code="0xf0" category="array" width="4"/> + </type> + </section> +</amqp> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/url.c ---------------------------------------------------------------------- diff --git a/proton-c/src/url.c b/proton-c/src/url.c deleted file mode 100644 index 566e91e..0000000 --- a/proton-c/src/url.c +++ /dev/null @@ -1,186 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/url.h" -#include "proton/object.h" -#include "util.h" -#include "platform.h" - -#include <stdlib.h> -#include <string.h> -#include <stdio.h> - -static char* copy(const char* str) { - if (str == NULL) return NULL; - char *str2 = (char*)malloc(strlen(str)+1); - if (str2) strcpy(str2, str); - return str2; -} - -struct pn_url_t { - char *scheme; - char *username; - char *password; - char *host; - char *port; - char *path; - pn_string_t *str; -}; - -/** Internal use only, returns the pn_string_t. Public function is pn_url_str() */ -static pn_string_t *pn_url_string(pn_url_t* url) -{ - pn_url_str(url); /* Make sure str is up to date */ - return url->str; -} - -static void pn_url_finalize(void *object) -{ - pn_url_t *url = (pn_url_t *) object; - pn_url_clear(url); - pn_free(url->str); -} - -static uintptr_t pn_url_hashcode(void *object) -{ - pn_url_t *url = (pn_url_t *) object; - return pn_hashcode(pn_url_string(url)); -} - -static intptr_t pn_url_compare(void *oa, void *ob) -{ - pn_url_t *a = (pn_url_t *) oa; - pn_url_t *b = (pn_url_t *) ob; - return pn_compare(pn_url_string(a), pn_url_string(b)); -} - - -static int pn_url_inspect(void *obj, pn_string_t *dst) -{ - pn_url_t *url = (pn_url_t *) obj; - int err = 0; - err = pn_string_addf(dst, "Url("); if (err) return err; - err = pn_inspect(pn_url_string(url), dst); if (err) return err; - return pn_string_addf(dst, ")"); -} - -#define pn_url_initialize NULL - - -pn_url_t *pn_url() { - static const pn_class_t clazz = PN_CLASS(pn_url); - pn_url_t *url = (pn_url_t*) pn_class_new(&clazz, sizeof(pn_url_t)); - if (!url) return NULL; - memset(url, 0, sizeof(*url)); - url->str = pn_string(NULL); - return url; -} - -/** Parse a string URL as a pn_url_t. - *@param[in] url A URL string. - *@return The parsed pn_url_t or NULL if url is not a valid URL string. - */ -pn_url_t *pn_url_parse(const char *str) { - if (!str || !*str) /* Empty string or NULL is illegal. */ - return NULL; - - pn_url_t *url = pn_url(); - char *str2 = copy(str); - pni_parse_url(str2, &url->scheme, &url->username, &url->password, &url->host, &url->port, &url->path); - url->scheme = copy(url->scheme); - url->username = copy(url->username); - url->password = copy(url->password); - url->host = (url->host && !*url->host) ? NULL : copy(url->host); - url->port = copy(url->port); - url->path = copy(url->path); - - free(str2); - return url; -} - -/** Free a URL */ -void pn_url_free(pn_url_t *url) { pn_free(url); } - -/** Clear the contents of the URL. */ -void pn_url_clear(pn_url_t *url) { - pn_url_set_scheme(url, NULL); - pn_url_set_username(url, NULL); - pn_url_set_password(url, NULL); - pn_url_set_host(url, NULL); - pn_url_set_port(url, NULL); - pn_url_set_path(url, NULL); - pn_string_clear(url->str); -} - -/** URL-encode src and append to dst. */ -static void pni_urlencode(pn_string_t *dst, const char* src) { - static const char *bad = "@:/"; - - if (!src) return; - const char *i = src; - const char *j = strpbrk(i, bad); - while (j) { - pn_string_addf(dst, "%.*s", (int)(j-i), i); - pn_string_addf(dst, "%%%02X", (int)*j); - i = j + 1; - j = strpbrk(i, bad); - } - pn_string_addf(dst, "%s", i); -} - - -/** Return the string form of a URL. */ -const char *pn_url_str(pn_url_t *url) { - if (pn_string_get(url->str) == NULL) { - pn_string_set(url->str, ""); - if (url->scheme) pn_string_addf(url->str, "%s://", url->scheme); - if (url->username) pni_urlencode(url->str, url->username); - if (url->password) { - pn_string_addf(url->str, ":"); - pni_urlencode(url->str, url->password); - } - if (url->username || url->password) pn_string_addf(url->str, "@"); - if (url->host) { - if (strchr(url->host, ':')) pn_string_addf(url->str, "[%s]", url->host); - else pn_string_addf(url->str, "%s", url->host); - } - if (url->port) pn_string_addf(url->str, ":%s", url->port); - if (url->path) pn_string_addf(url->str, "/%s", url->path); - } - return pn_string_get(url->str); -} - -const char *pn_url_get_scheme(pn_url_t *url) { return url->scheme; } -const char *pn_url_get_username(pn_url_t *url) { return url->username; } -const char *pn_url_get_password(pn_url_t *url) { return url->password; } -const char *pn_url_get_host(pn_url_t *url) { return url->host; } -const char *pn_url_get_port(pn_url_t *url) { return url->port; } -const char *pn_url_get_path(pn_url_t *url) { return url->path; } - -#define SET(part) free(url->part); url->part = copy(part); pn_string_clear(url->str) -void pn_url_set_scheme(pn_url_t *url, const char *scheme) { SET(scheme); } -void pn_url_set_username(pn_url_t *url, const char *username) { SET(username); } -void pn_url_set_password(pn_url_t *url, const char *password) { SET(password); } -void pn_url_set_host(pn_url_t *url, const char *host) { SET(host); } -void pn_url_set_port(pn_url_t *url, const char *port) { SET(port); } -void pn_url_set_path(pn_url_t *url, const char *path) { SET(path); } - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/util.c ---------------------------------------------------------------------- diff --git a/proton-c/src/util.c b/proton-c/src/util.c deleted file mode 100644 index 47fbc34..0000000 --- a/proton-c/src/util.c +++ /dev/null @@ -1,274 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <stdarg.h> -#include <stdio.h> -#include <stdlib.h> -#include <proton/type_compat.h> -#include <ctype.h> -#include <string.h> -#include <proton/error.h> -#include <proton/types.h> -#include "util.h" - -ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size) -{ - int idx = 0; - for (unsigned i = 0; i < size; i++) - { - uint8_t c = src[i]; - if (isprint(c)) { - if (idx < (int) (capacity - 1)) { - dst[idx++] = c; - } else { - if (idx > 0) { - dst[idx - 1] = '\0'; - } - return PN_OVERFLOW; - } - } else { - if (idx < (int) (capacity - 4)) { - idx += sprintf(dst + idx, "\\x%.2x", c); - } else { - if (idx > 0) { - dst[idx - 1] = '\0'; - } - return PN_OVERFLOW; - } - } - } - - dst[idx] = '\0'; - return idx; -} - -int pn_quote(pn_string_t *dst, const char *src, size_t size) -{ - while (true) { - size_t str_size = pn_string_size(dst); - char *str = pn_string_buffer(dst) + str_size; - size_t capacity = pn_string_capacity(dst) - str_size; - ssize_t ssize = pn_quote_data(str, capacity, src, size); - if (ssize == PN_OVERFLOW) { - int err = pn_string_grow(dst, (str_size + capacity) ? 2*(str_size + capacity) : 16); - if (err) return err; - } else if (ssize >= 0) { - return pn_string_resize(dst, str_size + ssize); - } else { - return ssize; - } - } -} - -void pn_fprint_data(FILE *stream, const char *bytes, size_t size) -{ - char buf[256]; - ssize_t n = pn_quote_data(buf, 256, bytes, size); - if (n >= 0) { - fputs(buf, stream); - } else { - if (n == PN_OVERFLOW) { - fputs(buf, stream); - fputs("... (truncated)", stream); - } - else - fprintf(stderr, "pn_quote_data: %s\n", pn_code(n)); - } -} - -void pn_print_data(const char *bytes, size_t size) -{ - pn_fprint_data(stdout, bytes, size); -} - -void pni_urldecode(const char *src, char *dst) -{ - const char *in = src; - char *out = dst; - while (*in != '\0') - { - if ('%' == *in) - { - if ((in[1] != '\0') && (in[2] != '\0')) - { - char esc[3]; - esc[0] = in[1]; - esc[1] = in[2]; - esc[2] = '\0'; - unsigned long d = strtoul(esc, NULL, 16); - *out = (char)d; - in += 3; - out++; - } - else - { - *out = *in; - in++; - out++; - } - } - else - { - *out = *in; - in++; - out++; - } - } - *out = '\0'; -} - -void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path) -{ - if (!url) return; - - char *slash = strchr(url, '/'); - - if (slash && slash>url) { - char *scheme_end = strstr(slash-1, "://"); - - if (scheme_end && scheme_end<slash) { - *scheme_end = '\0'; - *scheme = url; - url = scheme_end + 3; - slash = strchr(url, '/'); - } - } - - if (slash) { - *slash = '\0'; - *path = slash + 1; - } - - char *at = strchr(url, '@'); - if (at) { - *at = '\0'; - char *up = url; - *user = up; - url = at + 1; - char *colon = strchr(up, ':'); - if (colon) { - *colon = '\0'; - *pass = colon + 1; - } - } - - *host = url; - char *open = (*url == '[') ? url : 0; - if (open) { - char *close = strchr(open, ']'); - if (close) { - *host = open + 1; - *close = '\0'; - url = close + 1; - } - } - - char *colon = strchr(url, ':'); - if (colon) { - *colon = '\0'; - *port = colon + 1; - } - - if (*user) pni_urldecode(*user, *user); - if (*pass) pni_urldecode(*pass, *pass); -} - -void pni_vfatal(const char *fmt, va_list ap) -{ - vfprintf(stderr, fmt, ap); - abort(); -} - -void pni_fatal(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - pni_vfatal(fmt, ap); - va_end(ap); -} - -int pn_strcasecmp(const char *a, const char *b) -{ - int diff; - while (*b) { - char aa = *a++, bb = *b++; - diff = tolower(aa)-tolower(bb); - if ( diff!=0 ) return diff; - } - return *a; -} - -int pn_strncasecmp(const char* a, const char* b, size_t len) -{ - int diff = 0; - while (*b && len > 0) { - char aa = *a++, bb = *b++; - diff = tolower(aa)-tolower(bb); - if ( diff!=0 ) return diff; - --len; - }; - return len==0 ? diff : *a; -} - -bool pn_env_bool(const char *name) -{ - char *v = getenv(name); - return v && (!pn_strcasecmp(v, "true") || !pn_strcasecmp(v, "1") || - !pn_strcasecmp(v, "yes") || !pn_strcasecmp(v, "on")); -} - -char *pn_strdup(const char *src) -{ - if (src) { - char *dest = (char *) malloc((strlen(src)+1)*sizeof(char)); - if (!dest) return NULL; - return strcpy(dest, src); - } else { - return NULL; - } -} - -char *pn_strndup(const char *src, size_t n) -{ - if (src) { - unsigned size = 0; - for (const char *c = src; size < n && *c; c++) { - size++; - } - - char *dest = (char *) malloc(size + 1); - if (!dest) return NULL; - strncpy(dest, src, n); - dest[size] = '\0'; - return dest; - } else { - return NULL; - } -} - -// which timestamp will expire next, or zero if none set -pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b ) -{ - if (a && b) return pn_min(a, b); - if (a) return a; - return b; -} - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/util.h ---------------------------------------------------------------------- diff --git a/proton-c/src/util.h b/proton-c/src/util.h deleted file mode 100644 index ec59a07..0000000 --- a/proton-c/src/util.h +++ /dev/null @@ -1,126 +0,0 @@ -#ifndef _PROTON_SRC_UTIL_H -#define _PROTON_SRC_UTIL_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <errno.h> -#ifndef __cplusplus -#include <stdbool.h> -#endif -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <proton/types.h> -#include <proton/object.h> - -void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path); -void pni_fatal(const char *fmt, ...); -void pni_vfatal(const char *fmt, va_list ap); -ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size); -int pn_quote(pn_string_t *dst, const char *src, size_t size); -void pn_fprint_data(FILE *stream, const char *bytes, size_t size); -void pn_print_data(const char *bytes, size_t size); -bool pn_env_bool(const char *name); -pn_timestamp_t pn_timestamp_min(pn_timestamp_t a, pn_timestamp_t b); - -char *pn_strdup(const char *src); -char *pn_strndup(const char *src, size_t n); -int pn_strcasecmp(const char* a, const char* b); -int pn_strncasecmp(const char* a, const char* b, size_t len); - -#define DIE_IFR(EXPR, STRERR) \ - do { \ - int __code__ = (EXPR); \ - if (__code__) { \ - fprintf(stderr, "%s:%d: %s: %s (%d)\n", __FILE__, __LINE__, \ - #EXPR, STRERR(__code__), __code__); \ - exit(-1); \ - } \ - } while (0) - -#define DIE_IFE(EXPR) \ - do { \ - if ((EXPR) == -1) { \ - int __code__ = errno; \ - fprintf(stderr, "%s:%d: %s: %s (%d)\n", __FILE__, __LINE__, \ - #EXPR, strerror(__code__), __code__); \ - exit(-1); \ - } \ - } while (0) - - -#define LL_HEAD(ROOT, LIST) ((ROOT)-> LIST ## _head) -#define LL_TAIL(ROOT, LIST) ((ROOT)-> LIST ## _tail) -#define LL_ADD(ROOT, LIST, NODE) \ - { \ - (NODE)-> LIST ## _next = NULL; \ - (NODE)-> LIST ## _prev = (ROOT)-> LIST ## _tail; \ - if (LL_TAIL(ROOT, LIST)) \ - LL_TAIL(ROOT, LIST)-> LIST ## _next = (NODE); \ - LL_TAIL(ROOT, LIST) = (NODE); \ - if (!LL_HEAD(ROOT, LIST)) LL_HEAD(ROOT, LIST) = (NODE); \ - } - -#define LL_POP(ROOT, LIST, TYPE) \ - { \ - if (LL_HEAD(ROOT, LIST)) { \ - TYPE *_old = LL_HEAD(ROOT, LIST); \ - LL_HEAD(ROOT, LIST) = LL_HEAD(ROOT, LIST)-> LIST ## _next; \ - _old-> LIST ## _next = NULL; \ - if (_old == LL_TAIL(ROOT, LIST)) { \ - LL_TAIL(ROOT, LIST) = NULL; \ - } else { \ - LL_HEAD(ROOT, LIST)-> LIST ## _prev = NULL; \ - } \ - } \ - } - -#define LL_REMOVE(ROOT, LIST, NODE) \ - { \ - if ((NODE)-> LIST ## _prev) \ - (NODE)-> LIST ## _prev-> LIST ## _next = (NODE)-> LIST ## _next; \ - if ((NODE)-> LIST ## _next) \ - (NODE)-> LIST ## _next-> LIST ## _prev = (NODE)-> LIST ## _prev; \ - if ((NODE) == LL_HEAD(ROOT, LIST)) \ - LL_HEAD(ROOT, LIST) = (NODE)-> LIST ## _next; \ - if ((NODE) == LL_TAIL(ROOT, LIST)) \ - LL_TAIL(ROOT, LIST) = (NODE)-> LIST ## _prev; \ - } - -#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X)) -#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X)) - -#define PN_ENSURE(ARRAY, CAPACITY, COUNT, TYPE) \ - while ((CAPACITY) < (COUNT)) { \ - (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16; \ - (ARRAY) = (TYPE *) realloc((ARRAY), (CAPACITY) * sizeof (TYPE)); \ - } \ - -#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT, TYPE) \ - { \ - size_t _old_capacity = (CAPACITY); \ - PN_ENSURE(ARRAY, CAPACITY, COUNT, TYPE); \ - memset((ARRAY) + _old_capacity, 0, \ - sizeof(TYPE)*((CAPACITY) - _old_capacity)); \ - } - -#endif /* util.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/io.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/io.c b/proton-c/src/windows/io.c deleted file mode 100644 index 4a87fd2..0000000 --- a/proton-c/src/windows/io.c +++ /dev/null @@ -1,457 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#define FD_SETSIZE 2048 -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <mswsock.h> -#include <Ws2tcpip.h> - -#include "platform.h" -#include <proton/io.h> -#include <proton/object.h> -#include <proton/selector.h> -#include "iocp.h" -#include "util.h" - -#include <ctype.h> -#include <errno.h> -#include <stdio.h> -#include <assert.h> - -int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code) -{ - // Error code can be from GetLastError or WSAGetLastError, - char err[1024] = {0}; - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | - FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL); - return pn_error_format(error, PN_ERR, "%s: %s", msg, err); -} - -static void io_log(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fflush(stderr); -} - -struct pn_io_t { - char host[NI_MAXHOST]; - char serv[NI_MAXSERV]; - pn_error_t *error; - bool trace; - bool wouldblock; - iocp_t *iocp; -}; - -void pn_io_initialize(void *obj) -{ - pn_io_t *io = (pn_io_t *) obj; - io->error = pn_error(); - io->wouldblock = false; - io->trace = pn_env_bool("PN_TRACE_DRV"); - - /* Request WinSock 2.2 */ - WORD wsa_ver = MAKEWORD(2, 2); - WSADATA unused; - int err = WSAStartup(wsa_ver, &unused); - if (err) { - pni_win32_error(io->error, "WSAStartup", WSAGetLastError()); - fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error)); - } - io->iocp = pni_iocp(); -} - -void pn_io_finalize(void *obj) -{ - pn_io_t *io = (pn_io_t *) obj; - pn_error_free(io->error); - pn_free(io->iocp); - WSACleanup(); -} - -#define pn_io_hashcode NULL -#define pn_io_compare NULL -#define pn_io_inspect - -pn_io_t *pn_io(void) -{ - static const pn_class_t clazz = PN_CLASS(pn_io); - pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t)); - return io; -} - -void pn_io_free(pn_io_t *io) -{ - pn_free(io); -} - -pn_error_t *pn_io_error(pn_io_t *io) -{ - assert(io); - return io->error; -} - -static void ensure_unique(pn_io_t *io, pn_socket_t new_socket) -{ - // A brand new socket can have the same HANDLE value as a previous - // one after a socketclose. If the application closes one itself - // (i.e. not using pn_close), we don't find out about it until here. - iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket); - if (iocpd) { - if (io->trace) - io_log("Stale external socket reference discarded\n"); - // Re-use means former socket instance was closed - assert(iocpd->ops_in_progress == 0); - assert(iocpd->external); - // Clean up the straggler as best we can - pn_socket_t sock = iocpd->socket; - iocpd->socket = INVALID_SOCKET; - pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount - } -} - - -/* - * This heavyweight surrogate pipe could be replaced with a normal Windows pipe - * now that select() is no longer used. If interrupt semantics are all that is - * needed, a simple user space counter and reserved completion status would - * probably suffice. - */ -static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]); - -int pn_pipe(pn_io_t *io, pn_socket_t *dest) -{ - int n = pni_socket_pair(io, dest); - if (n) { - pni_win32_error(io->error, "pipe", WSAGetLastError()); - } - return n; -} - -static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) { - // - // Disable the Nagle algorithm on TCP connections. - // - int flag = 1; - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) { - perror("setsockopt"); - } - - u_long nonblock = 1; - if (ioctlsocket(sock, FIONBIO, &nonblock)) { - perror("ioctlsocket"); - } -} - -static inline pn_socket_t pni_create_socket(int domain, int protocol); - -static const char *amqp_service(const char *port) { - // Help older Windows to know about amqp[s] ports - if (port) { - if (!strcmp("amqp", port)) return "5672"; - if (!strcmp("amqps", port)) return "5671"; - } - return port; -} - -pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port) -{ - struct addrinfo *addr; - int code = getaddrinfo(host, amqp_service(port), NULL, &addr); - if (code) { - pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code)); - return INVALID_SOCKET; - } - - pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol); - if (sock == INVALID_SOCKET) { - pni_win32_error(io->error, "pni_create_socket", WSAGetLastError()); - return INVALID_SOCKET; - } - ensure_unique(io, sock); - - bool optval = 1; - if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval, - sizeof(optval)) == -1) { - pni_win32_error(io->error, "setsockopt", WSAGetLastError()); - closesocket(sock); - return INVALID_SOCKET; - } - - if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) { - pni_win32_error(io->error, "bind", WSAGetLastError()); - freeaddrinfo(addr); - closesocket(sock); - return INVALID_SOCKET; - } - freeaddrinfo(addr); - - if (listen(sock, 50) == -1) { - pni_win32_error(io->error, "listen", WSAGetLastError()); - closesocket(sock); - return INVALID_SOCKET; - } - - if (io->iocp->selector) { - iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); - if (!iocpd) { - pn_i_error_from_errno(io->error, "register"); - closesocket(sock); - return INVALID_SOCKET; - } - pni_iocpdesc_start(iocpd); - } - - return sock; -} - -pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port) -{ - // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets - const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1"; - - struct addrinfo *addr; - int code = getaddrinfo(host, amqp_service(port), NULL, &addr); - if (code) { - pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code)); - return INVALID_SOCKET; - } - - pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol); - if (sock == INVALID_SOCKET) { - pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError()); - freeaddrinfo(addr); - return INVALID_SOCKET; - } - - ensure_unique(io, sock); - pn_configure_sock(io, sock); - - if (io->iocp->selector) { - return pni_iocp_begin_connect(io->iocp, sock, addr, io->error); - } else { - if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) { - if (WSAGetLastError() != WSAEWOULDBLOCK) { - pni_win32_error(io->error, "connect", WSAGetLastError()); - freeaddrinfo(addr); - closesocket(sock); - return INVALID_SOCKET; - } - } - - freeaddrinfo(addr); - return sock; - } -} - -pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size) -{ - struct sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); - iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock); - pn_socket_t accept_sock; - - *name = '\0'; - if (listend) - accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error); - else { - // User supplied socket - accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen); - if (accept_sock == INVALID_SOCKET) - pni_win32_error(io->error, "sync accept", WSAGetLastError()); - } - - if (accept_sock == INVALID_SOCKET) - return accept_sock; - - int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, - io->serv, NI_MAXSERV, 0); - if (code) - code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, - io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV); - if (code) { - pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code)); - pn_close(io, accept_sock); - return INVALID_SOCKET; - } else { - pn_configure_sock(io, accept_sock); - snprintf(name, size, "%s:%s", io->host, io->serv); - if (listend) { - pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock)); - } - return accept_sock; - } -} - -static inline pn_socket_t pni_create_socket(int domain, int protocol) { - return socket(domain, SOCK_STREAM, protocol); -} - -ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) { - ssize_t count; - iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd); - if (iocpd) { - count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error); - } else { - count = send(sockfd, (const char *) buf, len, 0); - io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; - } - return count; -} - -ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size) -{ - ssize_t count; - iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); - if (iocpd) { - count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error); - } else { - count = recv(socket, (char *) buf, size, 0); - io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; - } - return count; -} - -ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) -{ - // non-socket io is mapped to socket io for now. See pn_pipe() - return pn_send(io, socket, buf, size); -} - -ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size) -{ - return pn_recv(io, socket, buf, size); -} - -void pn_close(pn_io_t *io, pn_socket_t socket) -{ - iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); - if (iocpd) - pni_iocp_begin_close(iocpd); - else { - closesocket(socket); - } -} - -bool pn_wouldblock(pn_io_t *io) -{ - return io->wouldblock; -} - -pn_selector_t *pn_io_selector(pn_io_t *io) -{ - if (io->iocp->selector == NULL) - io->iocp->selector = pni_selector_create(io->iocp); - return io->iocp->selector; -} - -static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock) -{ - u_long v = 1; - ioctlsocket (sock, FIONBIO, &v); - ensure_unique(io, sock); - iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); - pni_iocpdesc_start(iocpd); -} - - -static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) { - // no socketpair on windows. provide pipe() semantics using sockets - struct protoent * pe_tcp = getprotobyname("tcp"); - if (pe_tcp == NULL) { - perror("getprotobyname"); - return -1; - } - - SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto); - if (sock == INVALID_SOCKET) { - perror("socket"); - return -1; - } - - BOOL b = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) { - perror("setsockopt"); - closesocket(sock); - return -1; - } - else { - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; - addr.sin_port = 0; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - - if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - perror("bind"); - closesocket(sock); - return -1; - } - } - - if (listen(sock, 50) == -1) { - perror("listen"); - closesocket(sock); - return -1; - } - - if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) { - perror("sock1"); - closesocket(sock); - return -1; - } - else { - struct sockaddr addr = {0}; - int l = sizeof(addr); - if (getsockname(sock, &addr, &l) == -1) { - perror("getsockname"); - closesocket(sock); - return -1; - } - - if (connect(sv[1], &addr, sizeof(addr)) == -1) { - int err = WSAGetLastError(); - fprintf(stderr, "connect wsaerrr %d\n", err); - closesocket(sock); - closesocket(sv[1]); - return -1; - } - - if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) { - perror("accept"); - closesocket(sock); - closesocket(sv[1]); - return -1; - } - } - - configure_pipe_socket(io, sv[0]); - configure_pipe_socket(io, sv[1]); - closesocket(sock); - return 0; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c deleted file mode 100644 index d1abc9a..0000000 --- a/proton-c/src/windows/iocp.c +++ /dev/null @@ -1,1176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <mswsock.h> -#include <Ws2tcpip.h> - -#include "platform.h" -#include <proton/object.h> -#include <proton/io.h> -#include <proton/selector.h> -#include <proton/error.h> -#include <proton/transport.h> -#include "iocp.h" -#include "util.h" -#include <assert.h> - -/* - * Windows IO Completion Port support for Proton. - * - * Overlapped writes are used to avoid lengthy stalls between write - * completion and starting a new write. Non-overlapped reads are used - * since Windows accumulates inbound traffic without stalling and - * managing read buffers would not avoid a memory copy at the pn_read - * boundary. - * - * A socket must not get a Windows closesocket() unless the - * application has called pn_close on the socket or a global - * pn_io_finalize(). On error, the internal accounting for - * write_closed or read_closed may be updated along with the external - * event notification. A socket may be closed if it is never added to - * the iocpdesc_map or is on its way out of the map. - */ - -// Max number of overlapped accepts per listener -#define IOCP_MAX_ACCEPTS 10 - -// AcceptEx squishes the local and remote addresses and optional data -// all together when accepting the connection. Reserve enough for -// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding -// per address is required by AcceptEx. -#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16) -#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN) - -static void iocp_log(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fflush(stderr); -} - -static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status) -{ - char buf[512]; - if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, - 0, status, 0, buf, sizeof(buf), 0)) - pn_error_set(error, code, buf); - else { - fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError()); - } -} - -static void reap_check(iocpdesc_t *); -static void bind_to_completion_port(iocpdesc_t *iocpd); -static void iocp_shutdown(iocpdesc_t *iocpd); -static void start_reading(iocpdesc_t *iocpd); -static bool is_listener(iocpdesc_t *iocpd); -static void release_sys_sendbuf(SOCKET s); - -static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text) -{ - pni_win32_error(iocpd->error, text, status); - if (iocpd->iocp->iocp_trace) { - iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error)); - } - iocpd->write_closed = true; - iocpd->read_closed = true; - iocpd->poll_error = true; - pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE)); -} - -// Helper functions to use specialized IOCP AcceptEx() and ConnectEx() -static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s) -{ - GUID guid = WSAID_ACCEPTEX; - DWORD bytes = 0; - LPFN_ACCEPTEX fn; - WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), - &fn, sizeof(fn), &bytes, NULL, NULL); - assert(fn); - return fn; -} - -static LPFN_CONNECTEX lookup_connect_ex(SOCKET s) -{ - GUID guid = WSAID_CONNECTEX; - DWORD bytes = 0; - LPFN_CONNECTEX fn; - WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), - &fn, sizeof(fn), &bytes, NULL, NULL); - assert(fn); - return fn; -} - -static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s) -{ - GUID guid = WSAID_GETACCEPTEXSOCKADDRS; - DWORD bytes = 0; - LPFN_GETACCEPTEXSOCKADDRS fn; - WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), - &fn, sizeof(fn), &bytes, NULL, NULL); - assert(fn); - return fn; -} - -// match accept socket to listener socket -static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd) -{ - sockaddr_storage sa; - socklen_t salen = sizeof(sa); - if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1) - return NULL; - SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM - if (s == INVALID_SOCKET) - return NULL; - return pni_iocpdesc_create(iocpd->iocp, s, false); -} - -static bool is_listener(iocpdesc_t *iocpd) -{ - return iocpd && iocpd->acceptor; -} - -// === Async accept processing - -typedef struct { - iocp_result_t base; - iocpdesc_t *new_sock; - char address_buffer[IOCP_SOCKADDRBUFLEN]; - DWORD unused; -} accept_result_t; - -static accept_result_t *accept_result(iocpdesc_t *listen_sock) { - accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t)); - if (result) { - result->base.type = IOCP_ACCEPT; - result->base.iocpd = listen_sock; - } - return result; -} - -static void reset_accept_result(accept_result_t *result) { - memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); - memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN); -} - -struct pni_acceptor_t { - int accept_queue_size; - pn_list_t *accepts; - iocpdesc_t *listen_sock; - bool signalled; - LPFN_ACCEPTEX fn_accept_ex; - LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs; -}; - -#define pni_acceptor_compare NULL -#define pni_acceptor_inspect NULL -#define pni_acceptor_hashcode NULL - -static void pni_acceptor_initialize(void *object) -{ - pni_acceptor_t *acceptor = (pni_acceptor_t *) object; - acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS); -} - -static void pni_acceptor_finalize(void *object) -{ - pni_acceptor_t *acceptor = (pni_acceptor_t *) object; - size_t len = pn_list_size(acceptor->accepts); - for (size_t i = 0; i < len; i++) - free(pn_list_get(acceptor->accepts, i)); - pn_free(acceptor->accepts); -} - -static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd) -{ - static const pn_cid_t CID_pni_acceptor = CID_pn_void; - static const pn_class_t clazz = PN_CLASS(pni_acceptor); - pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t)); - acceptor->listen_sock = iocpd; - acceptor->accept_queue_size = 0; - acceptor->signalled = false; - pn_socket_t sock = acceptor->listen_sock->socket; - acceptor->fn_accept_ex = lookup_accept_ex(sock); - acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock); - return acceptor; -} - -static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) -{ - if (acceptor->listen_sock->closing) { - if (result) { - free(result); - acceptor->accept_queue_size--; - } - if (acceptor->accept_queue_size == 0) - acceptor->signalled = true; - return; - } - - if (result) { - reset_accept_result(result); - } else { - if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && - pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) { - result = accept_result(acceptor->listen_sock); - acceptor->accept_queue_size++; - } else { - // an async accept is still pending or max concurrent accepts already hit - return; - } - } - - result->new_sock = create_same_type_socket(acceptor->listen_sock); - if (result->new_sock) { - // Not yet connected. - result->new_sock->read_closed = true; - result->new_sock->write_closed = true; - - bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket, - result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, - &result->unused, (LPOVERLAPPED) result); - if (!success && WSAGetLastError() != ERROR_IO_PENDING) { - result->base.status = WSAGetLastError(); - pn_list_add(acceptor->accepts, result); - pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE); - } else { - acceptor->listen_sock->ops_in_progress++; - // This socket is equally involved in the async operation. - result->new_sock->ops_in_progress++; - } - } else { - iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket"); - } -} - -static void complete_accept(accept_result_t *result, HRESULT status) -{ - result->new_sock->ops_in_progress--; - iocpdesc_t *ld = result->base.iocpd; - if (ld->read_closed) { - if (!result->new_sock->closing) - pni_iocp_begin_close(result->new_sock); - free(result); // discard - reap_check(ld); - } else { - result->base.status = status; - pn_list_add(ld->acceptor->accepts, result); - pni_events_update(ld, ld->events | PN_READABLE); - } -} - -pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error) -{ - if (!is_listener(ld)) { - set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); - return INVALID_SOCKET; - } - if (ld->read_closed) { - set_iocp_error_status(error, PN_ERR, WSAENOTSOCK); - return INVALID_SOCKET; - } - if (pn_list_size(ld->acceptor->accepts) == 0) { - if (ld->events & PN_READABLE && ld->iocp->iocp_trace) - iocp_log("listen socket readable with no available accept completions\n"); - *would_block = true; - return INVALID_SOCKET; - } - - accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0); - pn_list_del(ld->acceptor->accepts, 0, 1); - if (!pn_list_size(ld->acceptor->accepts)) - pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts - - pn_socket_t accept_sock; - if (result->base.status) { - accept_sock = INVALID_SOCKET; - pni_win32_error(ld->error, "accept failure", result->base.status); - if (ld->iocp->iocp_trace) - iocp_log("%s\n", pn_error_text(ld->error)); - // App never sees this socket so close it here. - pni_iocp_begin_close(result->new_sock); - } else { - accept_sock = result->new_sock->socket; - // AcceptEx special setsockopt: - setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket, - sizeof (SOCKET)); - if (addr && addrlen && *addrlen > 0) { - sockaddr_storage *local_addr = NULL; - sockaddr_storage *remote_addr = NULL; - int local_addrlen, remote_addrlen; - LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs; - fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, - (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr, - &remote_addrlen); - *addrlen = pn_min(*addrlen, remote_addrlen); - memmove(addr, remote_addr, *addrlen); - } - } - - if (accept_sock != INVALID_SOCKET) { - // Connected. - result->new_sock->read_closed = false; - result->new_sock->write_closed = false; - } - - // Done with the completion result, so reuse it - result->new_sock = NULL; - begin_accept(ld->acceptor, result); - return accept_sock; -} - - -// === Async connect processing - -typedef struct { - iocp_result_t base; - char address_buffer[IOCP_SOCKADDRBUFLEN]; - struct addrinfo *addrinfo; -} connect_result_t; - -#define connect_result_initialize NULL -#define connect_result_compare NULL -#define connect_result_inspect NULL -#define connect_result_hashcode NULL - -static void connect_result_finalize(void *object) -{ - connect_result_t *result = (connect_result_t *) object; - // Do not release addrinfo until ConnectEx completes - if (result->addrinfo) - freeaddrinfo(result->addrinfo); -} - -static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) { - static const pn_cid_t CID_connect_result = CID_pn_void; - static const pn_class_t clazz = PN_CLASS(connect_result); - connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t)); - if (result) { - memset(result, 0, sizeof(connect_result_t)); - result->base.type = IOCP_CONNECT; - result->base.iocpd = iocpd; - result->addrinfo = addr; - } - return result; -} - -pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error) -{ - // addr lives for the duration of the async connect. Caller has passed ownership here. - // See connect_result_finalize(). - // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound: - sockaddr_storage sa; - memset(&sa, 0, sizeof(sa)); - sa.ss_family = addr->ai_family; - if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) { - pni_win32_error(error, "begin async connection", WSAGetLastError()); - if (iocp->iocp_trace) - iocp_log("%s\n", pn_error_text(error)); - closesocket(sock); - freeaddrinfo(addr); - return INVALID_SOCKET; - } - - iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false); - bind_to_completion_port(iocpd); - LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket); - connect_result_t *result = connect_result(iocpd, addr); - DWORD unused; - bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen, - NULL, 0, &unused, (LPOVERLAPPED) result); - if (!success && WSAGetLastError() != ERROR_IO_PENDING) { - pni_win32_error(error, "ConnectEx failure", WSAGetLastError()); - pn_free(result); - iocpd->write_closed = true; - iocpd->read_closed = true; - if (iocp->iocp_trace) - iocp_log("%s\n", pn_error_text(error)); - } else { - iocpd->ops_in_progress++; - } - return sock; -} - -static void complete_connect(connect_result_t *result, HRESULT status) -{ - iocpdesc_t *iocpd = result->base.iocpd; - if (iocpd->closing) { - pn_free(result); - reap_check(iocpd); - return; - } - - if (status) { - iocpdesc_fail(iocpd, status, "Connect failure"); - // Posix sets selectable events as follows: - pni_events_update(iocpd, PN_READABLE | PN_EXPIRED); - } else { - release_sys_sendbuf(iocpd->socket); - if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { - iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)"); - } else { - pni_events_update(iocpd, PN_WRITABLE); - start_reading(iocpd); - } - } - pn_free(result); - return; -} - - -// === Async writes - -static bool write_in_progress(iocpdesc_t *iocpd) -{ - return pni_write_pipeline_size(iocpd->pipeline) != 0; -} - -write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen) -{ - write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1); - if (result) { - result->base.type = IOCP_WRITE; - result->base.iocpd = iocpd; - result->buffer.start = buf; - result->buffer.size = buflen; - } - return result; -} - -static int submit_write(write_result_t *result, const void *buf, size_t len) -{ - WSABUF wsabuf; - wsabuf.buf = (char *) buf; - wsabuf.len = len; - memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); - return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0, - (LPOVERLAPPED) result, 0); -} - -ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error) -{ - if (len == 0) return 0; - *would_block = false; - if (is_listener(iocpd)) { - set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); - return INVALID_SOCKET; - } - if (iocpd->closing) { - set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); - return SOCKET_ERROR; - } - if (iocpd->write_closed) { - assert(pn_error_code(iocpd->error)); - pn_error_copy(error, iocpd->error); - if (iocpd->iocp->iocp_trace) - iocp_log("write error: %s\n", pn_error_text(error)); - return SOCKET_ERROR; - } - if (len == 0) return 0; - if (!(iocpd->events & PN_WRITABLE)) { - *would_block = true; - return SOCKET_ERROR; - } - - size_t written = 0; - size_t requested = len; - const char *outgoing = (const char *) buf; - size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len); - if (!available) { - *would_block = true; - return SOCKET_ERROR; - } - - for (size_t wr_count = 0; wr_count < available; wr_count++) { - write_result_t *result = pni_write_pipeline_next(iocpd->pipeline); - assert(result); - result->base.iocpd = iocpd; - ssize_t actual_len = pn_min(len, result->buffer.size); - result->requested = actual_len; - memmove((void *)result->buffer.start, outgoing, actual_len); - outgoing += actual_len; - written += actual_len; - len -= actual_len; - - int werror = submit_write(result, result->buffer.start, actual_len); - if (werror && WSAGetLastError() != ERROR_IO_PENDING) { - pni_write_pipeline_return(iocpd->pipeline, result); - iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send"); - return SOCKET_ERROR; - } - iocpd->ops_in_progress++; - } - - if (!pni_write_pipeline_writable(iocpd->pipeline)) - pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); - return written; -} - -/* - * Note: iocp write completion is not "bytes on the wire", it is "peer - * acked the sent bytes". Completion can be seconds on a slow - * consuming peer. - */ -static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status) -{ - iocpdesc_t *iocpd = result->base.iocpd; - if (iocpd->closing) { - pni_write_pipeline_return(iocpd->pipeline, result); - if (!iocpd->write_closed && !write_in_progress(iocpd)) - iocp_shutdown(iocpd); - reap_check(iocpd); - return; - } - if (status == 0 && xfer_count > 0) { - if (xfer_count != result->requested) { - // Is this recoverable? How to preserve order if multiple overlapped writes? - pni_write_pipeline_return(iocpd->pipeline, result); - iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket"); - return; - } else { - // Success. - pni_write_pipeline_return(iocpd->pipeline, result); - if (pni_write_pipeline_writable(iocpd->pipeline)) - pni_events_update(iocpd, iocpd->events | PN_WRITABLE); - return; - } - } - // Other error - pni_write_pipeline_return(iocpd->pipeline, result); - if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN - || status == ERROR_NETNAME_DELETED) { - iocpd->write_closed = true; - iocpd->poll_error = true; - pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); - pni_win32_error(iocpd->error, "Remote close or timeout", status); - } else { - iocpdesc_fail(iocpd, status, "IOCP async write error"); - } -} - - -// === Async reads - -struct read_result_t { - iocp_result_t base; - size_t drain_count; - char unused_buf[1]; -}; - -static read_result_t *read_result(iocpdesc_t *iocpd) -{ - read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1); - if (result) { - result->base.type = IOCP_READ; - result->base.iocpd = iocpd; - } - return result; -} - -static void begin_zero_byte_read(iocpdesc_t *iocpd) -{ - if (iocpd->read_in_progress) return; - if (iocpd->read_closed) { - pni_events_update(iocpd, iocpd->events | PN_READABLE); - return; - } - - read_result_t *result = iocpd->read_result; - memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); - DWORD flags = 0; - WSABUF wsabuf; - wsabuf.buf = result->unused_buf; - wsabuf.len = 0; - int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags, - &result->base.overlapped, 0); - if (rc && WSAGetLastError() != ERROR_IO_PENDING) { - iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error"); - return; - } - iocpd->ops_in_progress++; - iocpd->read_in_progress = true; -} - -static void drain_until_closed(iocpdesc_t *iocpd) { - size_t max_drain = 16 * 1024; - char buf[512]; - read_result_t *result = iocpd->read_result; - while (result->drain_count < max_drain) { - int rv = recv(iocpd->socket, buf, 512, 0); - if (rv > 0) - result->drain_count += rv; - else if (rv == 0) { - iocpd->read_closed = true; - return; - } else if (WSAGetLastError() == WSAEWOULDBLOCK) { - // wait a little longer - start_reading(iocpd); - return; - } - else - break; - } - // Graceful close indication unlikely, force the issue - if (iocpd->iocp->iocp_trace) - if (result->drain_count >= max_drain) - iocp_log("graceful close on reader abandoned (too many chars)\n"); - else - iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError()); - iocpd->read_closed = true; -} - - -static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status) -{ - iocpdesc_t *iocpd = result->base.iocpd; - iocpd->read_in_progress = false; - - if (iocpd->closing) { - // Application no longer reading, but we are looking for a zero length read - if (!iocpd->read_closed) - drain_until_closed(iocpd); - reap_check(iocpd); - return; - } - - if (status == 0 && xfer_count == 0) { - // Success. - pni_events_update(iocpd, iocpd->events | PN_READABLE); - } else { - iocpdesc_fail(iocpd, status, "IOCP read complete error"); - } -} - -ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error) -{ - if (size == 0) return 0; - *would_block = false; - if (is_listener(iocpd)) { - set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); - return SOCKET_ERROR; - } - if (iocpd->closing) { - // Previous call to pn_close() - set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); - return SOCKET_ERROR; - } - if (iocpd->read_closed) { - if (pn_error_code(iocpd->error)) - pn_error_copy(error, iocpd->error); - else - set_iocp_error_status(error, PN_ERR, WSAENOTCONN); - return SOCKET_ERROR; - } - - int count = recv(iocpd->socket, (char *) buf, size, 0); - if (count > 0) { - pni_events_update(iocpd, iocpd->events & ~PN_READABLE); - begin_zero_byte_read(iocpd); - return (ssize_t) count; - } else if (count == 0) { - iocpd->read_closed = true; - return 0; - } - if (WSAGetLastError() == WSAEWOULDBLOCK) - *would_block = true; - else { - set_iocp_error_status(error, PN_ERR, WSAGetLastError()); - iocpd->read_closed = true; - } - return SOCKET_ERROR; -} - -static void start_reading(iocpdesc_t *iocpd) -{ - begin_zero_byte_read(iocpd); -} - - -// === The iocp descriptor - -static void pni_iocpdesc_initialize(void *object) -{ - iocpdesc_t *iocpd = (iocpdesc_t *) object; - memset(iocpd, 0, sizeof(iocpdesc_t)); - iocpd->socket = INVALID_SOCKET; -} - -static void pni_iocpdesc_finalize(void *object) -{ - iocpdesc_t *iocpd = (iocpdesc_t *) object; - pn_free(iocpd->acceptor); - pn_error_free(iocpd->error); - if (iocpd->pipeline) - if (write_in_progress(iocpd)) - iocp_log("iocp descriptor write leak\n"); - else - pn_free(iocpd->pipeline); - if (iocpd->read_in_progress) - iocp_log("iocp descriptor read leak\n"); - else - free(iocpd->read_result); -} - -static uintptr_t pni_iocpdesc_hashcode(void *object) -{ - iocpdesc_t *iocpd = (iocpdesc_t *) object; - return iocpd->socket; -} - -#define pni_iocpdesc_compare NULL -#define pni_iocpdesc_inspect NULL - -// Reference counted in the iocpdesc map, zombie_list, selector. -static iocpdesc_t *pni_iocpdesc(pn_socket_t s) -{ - static const pn_cid_t CID_pni_iocpdesc = CID_pn_void; - static pn_class_t clazz = PN_CLASS(pni_iocpdesc); - iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t)); - assert(iocpd); - iocpd->socket = s; - return iocpd; -} - -static bool is_listener_socket(pn_socket_t s) -{ - BOOL tval = false; - int tvalsz = sizeof(tval); - int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz); - return code == 0 && tval; -} - -iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { - assert (s != INVALID_SOCKET); - assert(!pni_iocpdesc_map_get(iocp, s)); - bool listening = is_listener_socket(s); - iocpdesc_t *iocpd = pni_iocpdesc(s); - iocpd->iocp = iocp; - if (iocpd) { - iocpd->external = external; - iocpd->error = pn_error(); - if (listening) { - iocpd->acceptor = pni_acceptor(iocpd); - } else { - iocpd->pipeline = pni_write_pipeline(iocpd); - iocpd->read_result = read_result(iocpd); - } - pni_iocpdesc_map_push(iocpd); - } - return iocpd; -} - -iocpdesc_t *pni_deadline_desc(iocp_t *iocp) { - // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or - // zombie list. Selector responsible to free/decref object. - iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET); - iocpd->iocp = iocp; - iocpd->deadline_desc = true; - return iocpd; -} - -// === Fast lookup of a socket's iocpdesc_t - -iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s); - return iocpd; -} - -void pni_iocpdesc_map_push(iocpdesc_t *iocpd) { - pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd); - pn_decref(iocpd); - assert(pn_refcount(iocpd) == 1); -} - -void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) { - pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s); -} - -static void bind_to_completion_port(iocpdesc_t *iocpd) -{ - if (iocpd->bound) return; - if (!iocpd->iocp->completion_port) { - iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port."); - return; - } - - if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) - iocpd->bound = true; - else { - iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup."); - } -} - -static void release_sys_sendbuf(SOCKET s) -{ - // Set the socket's send buffer size to zero. - int sz = 0; - int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int)); - assert(status == 0); -} - -void pni_iocpdesc_start(iocpdesc_t *iocpd) -{ - if (iocpd->bound) return; - bind_to_completion_port(iocpd); - if (is_listener(iocpd)) { - begin_accept(iocpd->acceptor, NULL); - } - else { - release_sys_sendbuf(iocpd->socket); - pni_events_update(iocpd, PN_WRITABLE); - start_reading(iocpd); - } -} - -static void complete(iocp_result_t *result, bool success, DWORD num_transferred) { - result->iocpd->ops_in_progress--; - DWORD status = success ? 0 : GetLastError(); - - switch (result->type) { - case IOCP_ACCEPT: - complete_accept((accept_result_t *) result, status); - break; - case IOCP_CONNECT: - complete_connect((connect_result_t *) result, status); - break; - case IOCP_WRITE: - complete_write((write_result_t *) result, num_transferred, status); - break; - case IOCP_READ: - complete_read((read_result_t *) result, num_transferred, status); - break; - default: - assert(false); - } -} - -void pni_iocp_drain_completions(iocp_t *iocp) -{ - while (true) { - DWORD timeout_ms = 0; - DWORD num_transferred = 0; - ULONG_PTR completion_key = 0; - OVERLAPPED *overlapped = 0; - - bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, - &completion_key, &overlapped, timeout_ms); - if (!overlapped) - return; // timed out - iocp_result_t *result = (iocp_result_t *) overlapped; - complete(result, good_op, num_transferred); - } -} - -// returns: -1 on error, 0 on timeout, 1 successful completion -int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) { - DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout; - DWORD num_transferred = 0; - ULONG_PTR completion_key = 0; - OVERLAPPED *overlapped = 0; - - bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, - &completion_key, &overlapped, win_timeout); - if (!overlapped) - if (GetLastError() == WAIT_TIMEOUT) - return 0; - else { - if (error) - pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError()); - return -1; - } - - iocp_result_t *result = (iocp_result_t *) overlapped; - complete(result, good_op, num_transferred); - return 1; -} - -// === Close (graceful and otherwise) - -// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress -// and fully closed. - -static void zombie_list_add(iocpdesc_t *iocpd) -{ - assert(iocpd->closing); - if (!iocpd->ops_in_progress) { - // No need to make a zombie. - if (iocpd->socket != INVALID_SOCKET) { - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; - iocpd->read_closed = true; - } - return; - } - // Allow 2 seconds for graceful shutdown before releasing socket resource. - iocpd->reap_time = pn_i_now() + 2000; - pn_list_add(iocpd->iocp->zombie_list, iocpd); -} - -static void reap_check(iocpdesc_t *iocpd) -{ - if (iocpd->closing && !iocpd->ops_in_progress) { - if (iocpd->socket != INVALID_SOCKET) { - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; - } - pn_list_remove(iocpd->iocp->zombie_list, iocpd); - // iocpd is decref'ed and possibly released - } -} - -pn_timestamp_t pni_zombie_deadline(iocp_t *iocp) -{ - if (pn_list_size(iocp->zombie_list)) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0); - return iocpd->reap_time; - } - return 0; -} - -void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now) -{ - pn_list_t *zl = iocp->zombie_list; - // Look for stale zombies that should have been reaped by "now" - for (size_t idx = 0; idx < pn_list_size(zl); idx++) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx); - if (iocpd->reap_time > now) - return; - if (iocpd->socket == INVALID_SOCKET) - continue; - assert(iocpd->ops_in_progress > 0); - if (iocp->iocp_trace) - iocp_log("async close: graceful close timeout exceeded\n"); - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; - iocpd->read_closed = true; - // outstanding ops should complete immediately now - } -} - -static void drain_zombie_completions(iocp_t *iocp) -{ - // No more pn_selector_select() from App, but zombies still need care and feeding - // until their outstanding async actions complete. - pni_iocp_drain_completions(iocp); - - // Discard any that have no pending async IO - size_t sz = pn_list_size(iocp->zombie_list); - for (size_t idx = 0; idx < sz;) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx); - if (!iocpd->ops_in_progress) { - pn_list_del(iocp->zombie_list, idx, 1); - sz--; - } else { - idx++; - } - } - - unsigned shutdown_grace = 2000; - char *override = getenv("PN_SHUTDOWN_GRACE"); - if (override) { - int grace = atoi(override); - if (grace > 0 && grace < 60000) - shutdown_grace = (unsigned) grace; - } - pn_timestamp_t now = pn_i_now(); - pn_timestamp_t deadline = now + shutdown_grace; - - while (pn_list_size(iocp->zombie_list)) { - if (now >= deadline) - break; - int rv = pni_iocp_wait_one(iocp, deadline - now, NULL); - if (rv < 0) { - iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError()); - break; - } - now = pn_i_now(); - } - if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace) - // Should only happen if really slow TCP handshakes, i.e. total network failure - iocp_log("network failure on Proton shutdown\n"); -} - -static pn_list_t *iocp_map_close_all(iocp_t *iocp) -{ - // Zombify stragglers, i.e. no pn_close() from the application. - pn_list_t *externals = pn_list(PN_OBJECT, 0); - for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; - entry = pn_hash_next(iocp->iocpdesc_map, entry)) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); - // Just listeners first. - if (is_listener(iocpd)) { - if (iocpd->external) { - // Owned by application, just keep a temporary reference to it. - // iocp_result_t structs must not be free'd until completed or - // the completion port is closed. - if (iocpd->ops_in_progress) - pn_list_add(externals, iocpd); - pni_iocpdesc_map_del(iocp, iocpd->socket); - } else { - // Make it a zombie. - pni_iocp_begin_close(iocpd); - } - } - } - pni_iocp_drain_completions(iocp); - - for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; - entry = pn_hash_next(iocp->iocpdesc_map, entry)) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); - if (iocpd->external) { - iocpd->read_closed = true; // Do not consume from read side - iocpd->write_closed = true; // Do not shutdown write side - if (iocpd->ops_in_progress) - pn_list_add(externals, iocpd); - pni_iocpdesc_map_del(iocp, iocpd->socket); - } else { - // Make it a zombie. - pni_iocp_begin_close(iocpd); - } - } - return externals; -} - -static void zombie_list_hard_close_all(iocp_t *iocp) -{ - pni_iocp_drain_completions(iocp); - size_t zs = pn_list_size(iocp->zombie_list); - for (size_t i = 0; i < zs; i++) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); - if (iocpd->socket != INVALID_SOCKET) { - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; - iocpd->read_closed = true; - iocpd->write_closed = true; - } - } - pni_iocp_drain_completions(iocp); - - // Zombies should be all gone. Do a sanity check. - zs = pn_list_size(iocp->zombie_list); - int remaining = 0; - int ops = 0; - for (size_t i = 0; i < zs; i++) { - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); - remaining++; - ops += iocpd->ops_in_progress; - } - if (remaining) - iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops); -} - -static void iocp_shutdown(iocpdesc_t *iocpd) -{ - if (iocpd->socket == PN_INVALID_SOCKET) - return; // Hard close in progress - if (shutdown(iocpd->socket, SD_SEND)) { - int err = WSAGetLastError(); - if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN) - if (iocpd->iocp->iocp_trace) - iocp_log("socket shutdown failed %d\n", err); - } - iocpd->write_closed = true; -} - -void pni_iocp_begin_close(iocpdesc_t *iocpd) -{ - assert (!iocpd->closing); - if (is_listener(iocpd)) { - // Listening socket is easy. Close the socket which will cancel async ops. - pn_socket_t old_sock = iocpd->socket; - iocpd->socket = INVALID_SOCKET; - iocpd->closing = true; - iocpd->read_closed = true; - iocpd->write_closed = true; - closesocket(old_sock); - // Pending accepts will now complete. Zombie can die when all consumed. - zombie_list_add(iocpd); - pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd - } else { - // Continue async operation looking for graceful close confirmation or timeout. - pn_socket_t old_sock = iocpd->socket; - iocpd->closing = true; - if (!iocpd->write_closed && !write_in_progress(iocpd)) - iocp_shutdown(iocpd); - zombie_list_add(iocpd); - pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd - } -} - - -// === iocp_t - -#define pni_iocp_hashcode NULL -#define pni_iocp_compare NULL -#define pni_iocp_inspect NULL - -void pni_iocp_initialize(void *obj) -{ - iocp_t *iocp = (iocp_t *) obj; - memset(iocp, 0, sizeof(iocp_t)); - pni_shared_pool_create(iocp); - iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - assert(iocp->completion_port != NULL); - iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75); - iocp->zombie_list = pn_list(PN_OBJECT, 0); - iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV"); - iocp->selector = NULL; -} - -void pni_iocp_finalize(void *obj) -{ - iocp_t *iocp = (iocp_t *) obj; - // Move sockets to closed state, except external sockets. - pn_list_t *externals = iocp_map_close_all(iocp); - // Now everything with ops_in_progress is in the zombie_list or the externals list. - assert(!pn_hash_head(iocp->iocpdesc_map)); - pn_free(iocp->iocpdesc_map); - - drain_zombie_completions(iocp); // Last chance for graceful close - zombie_list_hard_close_all(iocp); - CloseHandle(iocp->completion_port); // This cancels all our async ops - iocp->completion_port = NULL; - - if (pn_list_size(externals) && iocp->iocp_trace) - iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals)); - - // Now safe to free everything that might be touched by a former async operation. - pn_free(externals); - pn_free(iocp->zombie_list); - pni_shared_pool_free(iocp); -} - -iocp_t *pni_iocp() -{ - static const pn_cid_t CID_pni_iocp = CID_pn_void; - static const pn_class_t clazz = PN_CLASS(pni_iocp); - iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t)); - return iocp; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/iocp.h ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h deleted file mode 100644 index 0e052e5..0000000 --- a/proton-c/src/windows/iocp.h +++ /dev/null @@ -1,144 +0,0 @@ -#ifndef PROTON_SRC_IOCP_H -#define PROTON_SRC_IOCP_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/import_export.h> -#include <proton/selectable.h> -#include <proton/type_compat.h> - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct pni_acceptor_t pni_acceptor_t; -typedef struct write_result_t write_result_t; -typedef struct read_result_t read_result_t; -typedef struct write_pipeline_t write_pipeline_t; -typedef struct iocpdesc_t iocpdesc_t; - - -// One per pn_io_t. - -struct iocp_t { - HANDLE completion_port; - pn_hash_t *iocpdesc_map; - pn_list_t *zombie_list; - int shared_pool_size; - char *shared_pool_memory; - write_result_t **shared_results; - write_result_t **available_results; - size_t shared_available_count; - size_t writer_count; - int loopback_bufsize; - bool iocp_trace; - pn_selector_t *selector; -}; - - -// One for each socket. -// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list, -// selector->iocp_descriptors list. It should remain ref counted in the -// zombie_list until ops_in_progress == 0 or the completion port is closed. - -struct iocpdesc_t { - pn_socket_t socket; - iocp_t *iocp; - pni_acceptor_t *acceptor; - pn_error_t *error; - int ops_in_progress; - bool read_in_progress; - write_pipeline_t *pipeline; - read_result_t *read_result; - bool external; // true if socket set up outside Proton - bool bound; // associted with the completion port - bool closing; // pn_close called by application - bool read_closed; // EOF or read error - bool write_closed; // shutdown sent or write error - bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL - bool deadline_desc; // Socket-less deadline descriptor for selectors - pn_selector_t *selector; - pn_selectable_t *selectable; - int events; - int interests; - pn_timestamp_t deadline; - iocpdesc_t *triggered_list_next; - iocpdesc_t *triggered_list_prev; - iocpdesc_t *deadlines_next; - iocpdesc_t *deadlines_prev; - pn_timestamp_t reap_time;; -}; - -typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t; - -typedef struct { - OVERLAPPED overlapped; - iocp_type_t type; - iocpdesc_t *iocpd; - HRESULT status; -} iocp_result_t; - -struct write_result_t { - iocp_result_t base; - size_t requested; - bool in_use; - pn_bytes_t buffer; -}; - -iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external); -iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s); -iocpdesc_t *pni_deadline_desc(iocp_t *); -void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s); -void pni_iocpdesc_map_push(iocpdesc_t *iocpd); -void pni_iocpdesc_start(iocpdesc_t *iocpd); -void pni_iocp_drain_completions(iocp_t *); -int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *); -void pni_iocp_start_accepting(iocpdesc_t *iocpd); -pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error); -pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error); -ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *); -ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error); -void pni_iocp_begin_close(iocpdesc_t *iocpd); -iocp_t *pni_iocp(); - -void pni_events_update(iocpdesc_t *iocpd, int events); -write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen); -write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd); -size_t pni_write_pipeline_size(write_pipeline_t *); -bool pni_write_pipeline_writable(write_pipeline_t *); -void pni_write_pipeline_return(write_pipeline_t *, write_result_t *); -size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t); -write_result_t *pni_write_pipeline_next(write_pipeline_t *); -void pni_shared_pool_create(iocp_t *); -void pni_shared_pool_free(iocp_t *); -void pni_zombie_check(iocp_t *, pn_timestamp_t); -pn_timestamp_t pni_zombie_deadline(iocp_t *); - -pn_selector_t *pni_selector_create(iocp_t *iocp); - -int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code); - -#ifdef __cplusplus -} -#endif - -#endif /* iocp.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
