Oipo commented on a change in pull request #143: Feature/pubsub websocket full
duplex communication
URL: https://github.com/apache/celix/pull/143#discussion_r371027972
##########
File path:
bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
##########
@@ -558,34 +586,88 @@ static void* psa_websocket_recvThread(void * data) {
return NULL;
}
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection,
void *handle) {
+ if (handle != NULL) {
+ pubsub_websocket_topic_receiver_t *receiver =
(pubsub_websocket_topic_receiver_t *) handle;
+
+ //Get request info with host, port and uri information
+ const struct mg_request_info *ri = mg_get_request_info(connection);
+ if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+ char *key = NULL;
+ asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_websocket_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, key);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->uri = strndup(ri->request_uri, 1024 * 1024);
+ entry->socketAddress = strndup(ri->remote_addr, 1024 * 1024);
+ entry->socketPort = ri->remote_port;
+ entry->connected = true;
+ entry->statically = false;
+ entry->passive = true;
+ hashMap_put(receiver->requestedConnections.map, (void *)
entry->key, entry);
+ receiver->requestedConnections.allConnected = false;
+ } else {
+ free(key);
+ }
+
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+ }
+ }
+}
+
+
static int psa_websocketTopicReceiver_data(struct mg_connection *connection
__attribute__((unused)),
int op_code
__attribute__((unused)),
char *data,
size_t length,
void *handle) {
//Received a websocket message, append this message to the buffer of the
receiver.
if (handle != NULL) {
- psa_websocket_requested_connection_entry_t *entry =
(psa_websocket_requested_connection_entry_t *) handle;
+ pubsub_websocket_topic_receiver_t *receiver =
(pubsub_websocket_topic_receiver_t *) handle;
- celixThreadMutex_lock(&entry->recvBuffer->mutex);
+ celixThreadMutex_lock(&receiver->recvBuffer.mutex);
pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
const char *rcvdMsgData = malloc(length);
memcpy((void *) rcvdMsgData, data, length);
msg->msgData = rcvdMsgData;
msg->msgSize = length;
- celix_arrayList_add(entry->recvBuffer->list, msg);
- celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+ celix_arrayList_add(receiver->recvBuffer.list, msg);
+ celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
}
return 1; //keep open (non-zero), 0 to close the socket
}
-static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection __attribute__((unused)), void *handle) {
+static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection, void *handle) {
//Reset connection for this receiver entry
if (handle != NULL) {
Review comment:
See above comment about reducing nesting.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services