This is an automated email from the ASF dual-hosted git repository.

rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new c4175a4  Updated PSA websocket with full duplex communication
c4175a4 is described below

commit c4175a4050b4a8f6942519fa013e25296dbb0434
Author: dhbfischer <[email protected]>
AuthorDate: Sun Feb 2 14:55:09 2020 +0100

    Updated PSA websocket with full duplex communication
---
 bundles/http_admin/http_admin/src/activator.c      |   1 -
 bundles/http_admin/http_admin/src/service_tree.c   |   3 +-
 .../http_admin/http_admin/src/websocket_admin.c    |   3 +-
 bundles/pubsub/examples/CMakeLists.txt             |   6 +-
 bundles/pubsub/examples/pubsub/CMakeLists.txt      |   2 +-
 .../index.html => common/include/poiCmd.h}         |  47 +++--
 .../pubsub/msg_descriptors/msg_poiCmd.descriptor   |   8 +
 .../poiCmd.properties}                             |  20 +-
 .../CMakeLists.txt                                 |  36 ++--
 .../private/include/pubsub_websocket_private.h     |  97 ++++++++++
 .../private/src/ps_websocket_activator.c           | 164 ++++++++++++++++
 .../private/src/pubsub_websocket_example.c         | 207 +++++++++++++++++++++
 .../resources/index.html                           |  20 ++
 .../resources/script.js                            |  24 ++-
 .../src/pubsub_websocket_admin.c                   |  31 ++-
 .../src/pubsub_websocket_topic_receiver.c          | 126 ++++++++++---
 .../src/pubsub_websocket_topic_receiver.h          |   5 +-
 17 files changed, 710 insertions(+), 90 deletions(-)

diff --git a/bundles/http_admin/http_admin/src/activator.c 
b/bundles/http_admin/http_admin/src/activator.c
index b388d99..4578ada 100644
--- a/bundles/http_admin/http_admin/src/activator.c
+++ b/bundles/http_admin/http_admin/src/activator.c
@@ -54,7 +54,6 @@ static int http_admin_start(http_admin_activator_t *act, 
celix_bundle_context_t
     celix_bundle_t *bundle = celix_bundleContext_getBundle(ctx);
     char* root = celix_bundle_getEntry(bundle, "root");
 
-
     bool prop_use_websockets = celix_bundleContext_getPropertyAsBool(ctx, 
HTTP_ADMIN_USE_WEBSOCKETS_KEY, HTTP_ADMIN_USE_WEBSOCKETS_DFT);
     long listPort = celix_bundleContext_getPropertyAsLong(ctx,    
HTTP_ADMIN_LISTENING_PORTS_KEY, HTTP_ADMIN_LISTENING_PORTS_DFT);
     long websocketTimeoutMs = celix_bundleContext_getPropertyAsLong(ctx, 
HTTP_ADMIN_WEBSOCKET_TIMEOUT_MS_KEY, HTTP_ADMIN_WEBSOCKET_TIMEOUT_MS_DFT);
diff --git a/bundles/http_admin/http_admin/src/service_tree.c 
b/bundles/http_admin/http_admin/src/service_tree.c
index 46aa4e1..1066708 100644
--- a/bundles/http_admin/http_admin/src/service_tree.c
+++ b/bundles/http_admin/http_admin/src/service_tree.c
@@ -140,6 +140,7 @@ bool addServiceNode(service_tree_t *svc_tree, const char 
*uri, void *svc) {
                 svc_tree->tree_node_count++;
                 uri_exists = false;
             }
+            subNodeCount++;
         } else if (current->next != NULL) {
             current = current->next;
             current_data = current->svc_data;
@@ -152,8 +153,8 @@ bool addServiceNode(service_tree_t *svc_tree, const char 
*uri, void *svc) {
             current_data = node->svc_data;
             svc_tree->tree_node_count++;
             uri_exists = false;
+            subNodeCount++;
         }
-        subNodeCount++;
     }
 
     //Increment tree service count if URI exists (only one service can be 
added at once)
diff --git a/bundles/http_admin/http_admin/src/websocket_admin.c 
b/bundles/http_admin/http_admin/src/websocket_admin.c
index f6e9ce0..b81c62b 100644
--- a/bundles/http_admin/http_admin/src/websocket_admin.c
+++ b/bundles/http_admin/http_admin/src/websocket_admin.c
@@ -92,7 +92,8 @@ void websocket_admin_addWebsocketService(void *handle, void 
*svc, const celix_pr
     if(uri != NULL) {
         celixThreadMutex_lock(&(admin->admin_lock));
         if(addServiceNode(&admin->sock_svc_tree, uri, websockSvc)) {
-            mg_set_websocket_handler(admin->mg_ctx, uri, 
websocket_connect_handler, websocket_ready_handler, websocket_data_handler, 
websocket_close_handler, admin);
+            mg_set_websocket_handler(admin->mg_ctx, uri, 
websocket_connect_handler, websocket_ready_handler,
+                                     websocket_data_handler, 
websocket_close_handler, admin);
         } else {
             printf("Websocket service with URI %s already exists!\n", uri);
         }
diff --git a/bundles/pubsub/examples/CMakeLists.txt 
b/bundles/pubsub/examples/CMakeLists.txt
index a36e87d..b19e327 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -392,7 +392,7 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
 
 endif()
 
-add_celix_container(pubsub_publisher_websocket
+add_celix_container(pubsub_websocket_example
     GROUP pubsub
     BUNDLES
         Celix::log_service
@@ -402,7 +402,7 @@ add_celix_container(pubsub_publisher_websocket
         Celix::pubsub_serializer_json
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_websocket
-        celix_pubsub_websocket_publisher
+        celix_pubsub_websocket_example
     PROPERTIES
         PSA_TCP_VERBOSE=true
         PUBSUB_ETCD_DISCOVERY_VERBOSE=true
@@ -410,4 +410,4 @@ add_celix_container(pubsub_publisher_websocket
         CELIX_HTTP_ADMIN_LISTENING_PORTS=7660
         CELIX_HTTP_ADMIN_NUM_THREADS=5
 )
-target_link_libraries(pubsub_publisher_websocket PRIVATE 
${PUBSUB_CONTAINER_LIBS})
+target_link_libraries(pubsub_websocket_example PRIVATE 
${PUBSUB_CONTAINER_LIBS})
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 5bfb06f..6348662 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -19,7 +19,7 @@ include_directories("common/include")
 
 add_subdirectory(publisher)
 add_subdirectory(publisher2)
-add_subdirectory(publisher_websocket)
+add_subdirectory(pubsub_websocket)
 add_subdirectory(subscriber)
 
 
diff --git 
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html 
b/bundles/pubsub/examples/pubsub/common/include/poiCmd.h
similarity index 64%
copy from 
bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
copy to bundles/pubsub/examples/pubsub/common/include/poiCmd.h
index e1ecb3d..1c9c0a3 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
+++ b/bundles/pubsub/examples/pubsub/common/include/poiCmd.h
@@ -1,4 +1,4 @@
-<!--
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,26 +15,25 @@
  *  KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
--->
-<!DOCTYPE html>
-<html lang="en">
-<head>
-    <meta charset="utf-8"/>
-    <title>Apache Celix Websocket publisher example</title>
-    <script src="script.js"></script>
-</head>
-<body>
-    <div>
-        <h1>Poi1 message:</h1>
-        <br/>
-        <p id="receivePoi1"></p>
-    </div>
-    <br/>
-    <div>
-        <h1>Poi2 message:</h1>
-        <br/>
-        <p id="receivePoi2"></p>
-    </div>
-    <script>docReady();</script>
-</body>
-</html>
+ */
+/**
+ * poiCmd.h
+ *
+ *  \date       Jan 16, 2020
+ *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#ifndef POICMD_H_
+#define POICMD_H_
+
+#define MSG_POI_CMD_NAME "poiCmd" //Has to match the message name in the msg 
descriptor!
+
+struct poi_cmd{
+    char* command;
+};
+
+typedef struct poi_cmd poi_cmd_t;
+
+
+#endif /* POICMD_H_ */
diff --git 
a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor 
b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
new file mode 100644
index 0000000..f4f263c
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
@@ -0,0 +1,8 @@
+:header
+type=message
+name=poiCmd
+version=1.0.0
+:annotations
+classname=org.example.PointOfInterestCommand
+:message
+{t command}
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
similarity index 68%
copy from bundles/pubsub/examples/pubsub/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
index 5bfb06f..ffecb9d 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
@@ -15,11 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
-include_directories("common/include")
+#
+# included in the bundle at location META-INF/topics/[pub|sub]/poi2.properties
+#
 
-add_subdirectory(publisher)
-add_subdirectory(publisher2)
-add_subdirectory(publisher_websocket)
-add_subdirectory(subscriber)
+#topic info
+topic.name=poiCmd
+topic.id=poiCmd
 
+#Interface info
+interface.name=org.example.unknown
+interface.version=1.0.0
+interface.messages=poiCmd
 
+# Version info
+interface.message.consumer.range@poiCmd=[0.0.0,1.0.0)
+interface.message.provider.version@poiCmd=0.0.0
+
+qos=control
diff --git a/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
similarity index 53%
rename from bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
index 3e407fa..59f81cf 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
@@ -19,41 +19,45 @@
 find_package(CURL REQUIRED)
 
 
-add_celix_bundle(celix_pubsub_websocket_publisher
-    SYMBOLIC_NAME "apache_celix_pubsub_websocket_publisher"
+add_celix_bundle(celix_pubsub_websocket_example
+    SYMBOLIC_NAME "apache_celix_pubsub_websocket_example"
     VERSION "1.0.0"
-    SOURCES 
-        ../publisher/private/src/ps_pub_activator.c
-        ../publisher/private/src/pubsub_publisher.c
+    SOURCES
+        private/src/ps_websocket_activator.c
+        private/src/pubsub_websocket_example.c
 )
-target_link_libraries(celix_pubsub_websocket_publisher PRIVATE 
Celix::framework Celix::pubsub_api ${CURL_LIBRARIES})
-target_include_directories(celix_pubsub_websocket_publisher PRIVATE 
../publisher/private/include)
+target_link_libraries(celix_pubsub_websocket_example PRIVATE Celix::framework 
Celix::pubsub_api ${CURL_LIBRARIES})
+target_include_directories(celix_pubsub_websocket_example PRIVATE 
private/include)
 
-
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
     
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
     
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+    
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
     DESTINATION "META-INF/descriptors"
 )
 
-celix_bundle_files(celix_pubsub_websocket_publisher
-        
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
-        
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+celix_bundle_files(celix_pubsub_websocket_example
+    
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+    
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
     DESTINATION "META-INF/topics/pub"
 )
+celix_bundle_files(celix_pubsub_websocket_example
+    
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
+    DESTINATION "META-INF/topics/sub"
+)
 
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
     DESTINATION "META-INF/keys"
 )
 
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
     ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
     DESTINATION "META-INF/keys/subscriber"
 )
 
-celix_bundle_add_dir(celix_pubsub_websocket_publisher resources DESTINATION 
".")
+celix_bundle_add_dir(celix_pubsub_websocket_example resources DESTINATION ".")
 
-celix_bundle_headers(celix_pubsub_websocket_publisher
+celix_bundle_headers(celix_pubsub_websocket_example
     "X-Web-Resource: /example$<SEMICOLON>/resources"
 )
diff --git 
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
new file mode 100644
index 0000000..938b6f4
--- /dev/null
+++ 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_websocket_private.h
+ *
+ *  \date       Jan 16, 2020
+ *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_WEBSOCKET_PRIVATE_H_
+#define PUBSUB_WEBSOCKET_PRIVATE_H_
+
+
+#include <string.h>
+
+#include "celix_api.h"
+
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+
+struct pubsub_sender{
+    array_list_t *trackers;
+    const char *ident;
+    hash_map_t *tid_map; //service -> tid
+    long bundleId;
+    bool stop;
+};
+typedef struct pubsub_sender pubsub_sender_t;
+
+struct pubsub_receiver {
+    char *name;
+};
+
+typedef struct pubsub_receiver pubsub_receiver_t;
+
+
+/**
+ * Shared Publisher and Subscriber types
+ *
+ */
+struct pubsub_info {
+    pubsub_sender_t *sender;
+    bool sending;
+    pubsub_receiver_t *receiver;
+};
+
+typedef struct pubsub_info pubsub_info_t;
+
+/**
+ * Publisher
+ */
+struct send_thread_struct {
+    pubsub_publisher_t *service;
+    pubsub_info_t *pubsub;
+    const char *topic;
+};
+typedef struct send_thread_struct send_thread_struct_t;
+
+pubsub_sender_t* publisher_create(array_list_pt trackers, const char* 
ident,long bundleId);
+
+void publisher_start(pubsub_sender_t *client);
+void publisher_stop(pubsub_sender_t *client);
+
+void publisher_destroy(pubsub_sender_t *client);
+
+void publisher_publishSvcAdded(void * handle, void *svc, const 
celix_properties_t *props);
+void publisher_publishSvcRemoved(void * handle, void *svc, const 
celix_properties_t *props);
+
+/**
+ * Subscriber
+ */
+pubsub_receiver_t* subscriber_create(char* topics);
+void subscriber_start(pubsub_receiver_t* client);
+void subscriber_stop(pubsub_receiver_t* client);
+void subscriber_destroy(pubsub_receiver_t* client);
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int 
msgTypeId, void* msg, bool* release);
+
+
+#endif /* PUBSUB_WEBSOCKET_PRIVATE_H_ */
diff --git 
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
new file mode 100644
index 0000000..f5c5114
--- /dev/null
+++ 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * ps_websocket_activator.c
+ *
+ *  \date       Jan 16, 2020
+ *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <pubsub/publisher.h>
+
+#include "celix_api.h"
+
+#include "pubsub_websocket_private.h"
+
+
+static const char * PUB_TOPICS[] = {
+        "poi1",
+        "poi2",
+        NULL
+};
+
+
+#define SUB_NAME "poiCmd"
+static const char * SUB_TOPICS[] = {
+        "poiCmd",
+        NULL
+};
+
+struct ps_websocketActivator {
+    pubsub_info_t *pubsub;
+    //Publisher vars
+    array_list_t *trackerList;//List<service_tracker_pt>
+    //Subscriber vars
+    array_list_t *registrationList; //List<service_registration_pt>
+    pubsub_subscriber_t *subsvc;
+};
+
+static int pubsub_start(struct ps_websocketActivator *act, 
celix_bundle_context_t *ctx) {
+    const char *fwUUID = 
celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (fwUUID == NULL) {
+        printf("PUBLISHER: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    celix_bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+    long bundleId = celix_bundle_getId(bnd);
+
+    act->pubsub = calloc(1, sizeof(*act->pubsub));
+
+    //Publisher
+    act->trackerList = celix_arrayList_create();
+    act->pubsub->sender = publisher_create(act->trackerList, fwUUID, bundleId);
+
+    int i;
+    char filter[128];
+    for (i = 0; PUB_TOPICS[i] != NULL; i++) {
+        const char* topic = PUB_TOPICS[i];
+        memset(filter,0,128);
+#ifdef USE_SCOPE
+        char *scope;
+        asprintf(&scope, "my_scope_%d", i);
+        snprintf(filter, 128, "(%s=%s)(%s=%s)", PUBSUB_PUBLISHER_TOPIC, topic, 
PUBSUB_PUBLISHER_SCOPE, scope);
+        free(scope);
+#else
+        snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC, 
topic);
+#endif
+        celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+        opts.callbackHandle = act->pubsub;
+        opts.addWithProperties = publisher_publishSvcAdded;
+        opts.removeWithProperties = publisher_publishSvcRemoved;
+        opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.filter.filter = filter;
+        opts.filter.ignoreServiceLanguage = true;
+        long trackerId = celix_bundleContext_trackServicesWithOptions(ctx, 
&opts);
+
+        celix_arrayList_addLong(act->trackerList,trackerId);
+    }
+
+    publisher_start(act->pubsub->sender);
+
+    //Subscriber
+    act->registrationList = celix_arrayList_create();
+
+    pubsub_subscriber_t *subsvc = calloc(1,sizeof(*subsvc));
+    act->pubsub->receiver = subscriber_create(SUB_NAME);
+    subsvc->handle = act->pubsub;
+    subsvc->receive = pubsub_subscriber_recv;
+
+    act->subsvc = subsvc;
+
+    for (i = 0; SUB_TOPICS[i] != NULL; i++) {
+        const char* topic = SUB_TOPICS[i];
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, topic);
+#ifdef USE_SCOPE
+        char *scope;
+        asprintf(&scope, "my_scope_%d", i);
+        celix_properties_set(props, PUBSUB_SUBSCRIBER_SCOPE, scope);
+        free(scope);
+#endif
+        service_registration_pt reg = NULL;
+        bundleContext_registerService(ctx, PUBSUB_SUBSCRIBER_SERVICE_NAME, 
subsvc, props, &reg);
+        arrayList_add(act->registrationList,reg);
+    }
+
+    subscriber_start((pubsub_receiver_t *) act->subsvc->handle);
+
+    return 0;
+}
+
+static int pubsub_stop(struct ps_websocketActivator *act, 
celix_bundle_context_t *ctx) {
+    int i;
+
+    //Publishers
+    for (i = 0; i < celix_arrayList_size(act->trackerList); i++) {
+        long trkId = celix_arrayList_getLong(act->trackerList, i);
+        celix_bundleContext_stopTracker(ctx, trkId);
+    }
+    publisher_stop(act->pubsub->sender);
+
+    publisher_destroy(act->pubsub->sender);
+    celix_arrayList_destroy(act->trackerList);
+
+    //Subscriber
+    for (i = 0; i < celix_arrayList_size(act->registrationList); i++) {
+        service_registration_pt reg = 
celix_arrayList_get(act->registrationList, i);
+        serviceRegistration_unregister(reg);
+    }
+
+    subscriber_stop((pubsub_receiver_t *) act->pubsub->receiver);
+
+    act->subsvc->receive = NULL;
+    subscriber_destroy((pubsub_receiver_t *) act->pubsub->receiver);
+    act->subsvc->handle = NULL;
+    free(act->subsvc);
+    act->subsvc = NULL;
+
+    celix_arrayList_destroy(act->registrationList);
+    free(act->pubsub);
+
+    return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct ps_websocketActivator, pubsub_start, 
pubsub_stop)
\ No newline at end of file
diff --git 
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
new file mode 100644
index 0000000..7b820f0
--- /dev/null
+++ 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_subscriber.c
+ *
+ *  \date       Sep 21, 2010
+ *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+#include "poi.h"
+#include "poiCmd.h"
+#include "pubsub_websocket_private.h"
+
+#include "service_tracker.h"
+#include "celix_threads.h"
+
+
+static double randCoordinate(double min, double max) {
+    double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
+    return ret;
+}
+
+static void* send_thread(void* arg) {
+    send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
+
+    pubsub_publisher_t *publish_svc = st_struct->service;
+    pubsub_info_t *pubsubInfo = st_struct->pubsub;
+    pubsub_sender_t *publisher = st_struct->pubsub->sender;
+
+    char fwUUID[9];
+    memset(fwUUID, 0, 9);
+    memcpy(fwUUID, publisher->ident, 8);
+
+    location_t place = calloc(1, sizeof(*place));
+
+    char *desc = calloc(64, sizeof(char));
+    snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned 
long)pthread_self());
+
+    char *name = calloc(64, sizeof(char));
+    snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+
+    place->name = name;
+    place->description = desc;
+    place->extra = "extra value";
+    printf("TOPIC : %s\n", st_struct->topic);
+
+    unsigned int msgId = 0;
+
+    while (publisher->stop == false) {
+        if(pubsubInfo->sending) {
+            if (msgId == 0) {
+                if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle, 
st_struct->topic, &msgId) != 0) {
+                    printf("PUBLISHER: Cannot retrieve msgId for message 
'%s'\n", MSG_POI_NAME);
+                }
+            }
+
+            if (msgId > 0) {
+                place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
+                place->position.lon = randCoordinate(MIN_LON, MAX_LON);
+                int nr_char = (int) randCoordinate(5, 100000);
+                place->data = calloc(nr_char, 1);
+                for (int i = 0; i < (nr_char - 1); i++) {
+                    place->data[i] = i % 10 + '0';
+                }
+                place->data[nr_char - 1] = '\0';
+                if (publish_svc->send) {
+                    if (publish_svc->send(publish_svc->handle, msgId, place) 
== 0) {
+                        printf("Sent %s [%f, %f] (%s, %s) data len = %d\n", 
st_struct->topic,
+                               place->position.lat, place->position.lon, 
place->name, place->description, nr_char);
+                    }
+                } else {
+                    printf("No send for %s\n", st_struct->topic);
+                }
+
+                free(place->data);
+            }
+        }
+        sleep(2);
+    }
+
+    free(place->description);
+    free(place->name);
+    free(place);
+
+    free(st_struct);
+
+    return NULL;
+}
+
+pubsub_sender_t* publisher_create(array_list_pt trackers,const char* 
ident,long bundleId) {
+    pubsub_sender_t *publisher = malloc(sizeof(*publisher));
+
+    publisher->trackers = trackers;
+    publisher->ident = ident;
+    publisher->bundleId = bundleId;
+    publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
+    publisher->stop = false;
+
+    return publisher;
+}
+
+void publisher_start(pubsub_sender_t *client) {
+    printf("PUBLISHER: starting up...\n");
+}
+
+void publisher_stop(pubsub_sender_t *client) {
+    printf("PUBLISHER: stopping...\n");
+}
+
+void publisher_destroy(pubsub_sender_t *client) {
+    hashMap_destroy(client->tid_map, false, false);
+    client->trackers = NULL;
+    client->ident = NULL;
+    free(client);
+}
+
+void publisher_publishSvcAdded(void * handle, void *svc, const 
celix_properties_t *props) {
+    pubsub_publisher_t *publish_svc = (pubsub_publisher_t *) svc;
+    pubsub_info_t *manager = (pubsub_info_t *) handle;
+    manager->sender->stop = false;
+
+    printf("PUBLISHER: new publish service exported (%s).\n", 
manager->sender->ident);
+
+    send_thread_struct_t *data = calloc(1, sizeof(*data));
+    data->service = publish_svc;
+    data->pubsub = manager;
+    data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC, 
"!ERROR!");
+    celix_thread_t *tid = malloc(sizeof(*tid));
+    celixThread_create(tid, NULL, send_thread, (void*)data);
+    hashMap_put(manager->sender->tid_map, publish_svc, tid);
+}
+
+void publisher_publishSvcRemoved(void * handle, void *svc, const 
celix_properties_t *props) {
+    pubsub_info_t *manager = (pubsub_info_t *) handle;
+    celix_thread_t *tid = hashMap_get(manager->sender->tid_map, svc);
+    manager->sender->stop = true;
+
+#if defined(__APPLE__) && defined(__MACH__)
+    uint64_t threadid;
+    pthread_threadid_np(tid->thread, &threadid);
+    printf("PUBLISHER: publish service unexporting (%s) 
%llu!\n",manager->ident, threadid);
+#else
+    printf("PUBLISHER: publish service unexporting (%s) %li!\n", 
manager->sender->ident, tid->thread);
+#endif
+
+    celixThread_join(*tid,NULL);
+    free(tid);
+}
+
+pubsub_receiver_t* subscriber_create(char* topics) {
+    pubsub_receiver_t *sub = calloc(1,sizeof(*sub));
+    sub->name = strdup(topics);
+    return sub;
+}
+
+
+void subscriber_start(pubsub_receiver_t *subscriber) {
+    printf("Subscriber started...\n");
+}
+
+void subscriber_stop(pubsub_receiver_t *subscriber) {
+    printf("Subscriber stopped...\n");
+}
+
+void subscriber_destroy(pubsub_receiver_t *subscriber) {
+    if (subscriber->name != NULL) {
+        free(subscriber->name);
+    }
+    subscriber->name=NULL;
+    free(subscriber);
+}
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int 
msgTypeId, void* msg, bool* release) {
+    poi_cmd_t *cmd = (poi_cmd_t *) msg;
+    pubsub_info_t *pubsub = (pubsub_info_t *) handle;
+    printf("Received command %s\n", cmd->command);
+
+    if(strcmp(cmd->command, "start") == 0) {
+        pubsub->sending = true;
+    } else {
+        pubsub->sending = false;
+    }
+    return 0;
+}
diff --git 
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
similarity index 63%
rename from 
bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
index e1ecb3d..37c0daf 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
@@ -35,6 +35,26 @@
         <br/>
         <p id="receivePoi2"></p>
     </div>
+    <div>
+        <h1>Send start/stop message:</h1>
+        <br/>
+        <form onsubmit="return submitForm()">
+            <table>
+                <tr>
+                    <td>
+                        <p>Choose command: </p>
+                    </td>
+                    <td>
+                        <select name="command" id="command">
+                            <option value="start" 
selected="selected">Start</option>
+                            <option value="stop">Stop</option>
+                        </select>
+                    </td>
+                </tr>
+            </table>
+            <input type="submit" name="submit" class="button" id="button" 
value="Send!">
+        </form>
+    </div>
     <script>docReady();</script>
 </body>
 </html>
diff --git 
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js 
b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
similarity index 66%
rename from 
bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
index 81026a4..503db7a 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
@@ -17,22 +17,38 @@
  * under the License.
  */
 
+var seqnr = 0;
+var sendSocketP1;
+
 function docReady() {
     var host = window.location.host;
-    var shellSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
-    var shellSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+    var rcvSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
+    var rcvSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+    sendSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poiCmd");
 
-    shellSocketP1.onmessage = function (event) {
+    rcvSocketP1.onmessage = function (event) {
         console.log(event);
         var obj = JSON.parse(event.data);
         document.getElementById("receivePoi1").innerHTML = "Received " + 
obj.id + " message with sequence nr: "
             + obj.seqNr + "<br/>latitude = " + 
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " + 
JSON.stringify(obj.data.location.lon);
     };
 
-    shellSocketP2.onmessage = function (event) {
+    rcvSocketP2.onmessage = function (event) {
         console.log(event);
         var obj = JSON.parse(event.data);
         document.getElementById("receivePoi2").innerHTML = "Received " + 
obj.id + " message with sequence nr: "
             + obj.seqNr + "<br/>latitude = " + 
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " + 
JSON.stringify(obj.data.location.lon);
     };
 }
+
+function submitForm() {
+    var element = document.getElementById("command");
+    var value = element.options[element.selectedIndex].value;
+    let msg = "{\"id\":\"poiCmd\", \"major\": 1, \"minor\": 0, \"seqNr\": " + 
seqnr++ + ", \"data\": {" +
+        "\"command\": \"" + value + "\" }}";
+
+    console.log("Sending message: " + msg);
+    sendSocketP1.send(msg);
+    alert("sending message: " + msg);
+    return false;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 0a34dc1..e0a4bb0 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -455,8 +455,7 @@ static celix_status_t 
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_web
         if (eScope != NULL && eTopic != NULL &&
             strncmp(eScope, scope, 1024 * 1024) == 0 &&
             strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            char *uri = psa_websocket_createURI(eScope, eTopic);
-            pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, 
sockPort, uri);
+            pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress, 
sockPort);
         }
     }
 
@@ -496,13 +495,24 @@ static celix_status_t 
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsu
     const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
     const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
 
+    const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, 
NULL);
     const char *eScope = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
     const char *eTopic = celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+    const char *sockAddress = celix_properties_get(endpoint, 
PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL);
+    long sockPort = celix_properties_getAsLong(endpoint, 
PUBSUB_WEBSOCKET_PORT_KEY, -1L);
 
-    if (eScope != NULL && eTopic != NULL &&
+    bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+    if (publisher && (sockAddress == NULL || sockPort < 0)) {
+        L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket 
address/port or endpoint type. Properties:");
+        const char *key = NULL;
+        CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+            L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key, 
celix_properties_get(endpoint, key, NULL));
+        }
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else if(eScope != NULL && eTopic != NULL &&
             strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic, 
1024 * 1024) == 0) {
-        char *uri = psa_websocket_createURI(eScope, eTopic);
-        pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri);
+        pubsub_websocketTopicReceiver_disconnectFrom(receiver, sockAddress, 
sockPort);
 
     }
 
@@ -554,10 +564,9 @@ celix_status_t pubsub_websocketAdmin_executeCommand(void 
*handle, char *commandL
         const char *scope = pubsub_websocketTopicSender_scope(sender);
         const char *topic = pubsub_websocketTopicSender_topic(sender);
         const char *url = pubsub_websocketTopicSender_url(sender);
-//        const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ? 
" (static)" : "";
         fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
-        fprintf(out, "   |- url            = %s\n", url);
+        fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
@@ -574,21 +583,23 @@ celix_status_t pubsub_websocketAdmin_executeCommand(void 
*handle, char *commandL
         const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
         const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
         const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+        const char *urlEndp = pubsub_websocketTopicReceiver_url(receiver);
 
         celix_array_list_t *connected = celix_arrayList_create();
         celix_array_list_t *unconnected = celix_arrayList_create();
         pubsub_websocketTopicReceiver_listConnections(receiver, connected, 
unconnected);
 
         fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
-        fprintf(out, "   |- serializer type = %s\n", serType);
+        fprintf(out, "   |- serializer type      = %s\n", serType);
+        fprintf(out, "   |- url                  = %s\n", urlEndp);
         for (int i = 0; i < celix_arrayList_size(connected); ++i) {
             char *url = celix_arrayList_get(connected, i);
-            fprintf(out, "   |- connected url   = %s\n", url);
+            fprintf(out, "   |- connected endpoint   = %s\n", url);
             free(url);
         }
         for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
             char *url = celix_arrayList_get(unconnected, i);
-            fprintf(out, "   |- unconnected url = %s\n", url);
+            fprintf(out, "   |- unconnected endpoint = %s\n", url);
             free(url);
         }
         celix_arrayList_destroy(connected);
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index babd9ce..b6dfa71 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -70,6 +70,9 @@ struct pubsub_websocket_topic_receiver {
     char scopeAndTopicFilter[5];
     char *uri;
 
+    celix_websocket_service_t sockSvc;
+    long svcId;
+
     pubsub_websocket_rcv_buffer_t recvBuffer;
 
     struct {
@@ -93,7 +96,6 @@ struct pubsub_websocket_topic_receiver {
 };
 
 typedef struct psa_websocket_requested_connection_entry {
-    pubsub_websocket_rcv_buffer_t *recvBuffer;
     char *key; //host:port
     char *socketAddress;
     long socketPort;
@@ -102,6 +104,7 @@ typedef struct psa_websocket_requested_connection_entry {
     int connectRetryCount;
     bool connected;
     bool statically; //true if the connection is statically configured through 
the topic properties.
+    bool passive; //true if the connection is initiated by another resource 
(e.g. webpage)
 } psa_websocket_requested_connection_entry_t;
 
 typedef struct psa_websocket_subscriber_entry {
@@ -119,6 +122,7 @@ static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
 static void 
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t 
*receiver);
 static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t 
*msg_type_id_map);
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, 
void *handle);
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection, 
int op_code, char *data, size_t length, void *handle);
 static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection, void *handle);
 
@@ -168,6 +172,19 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
         receiver->subscriberTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
     }
 
+    //Register a websocket endpoint for this topic receiver
+    if(receiver->uri != NULL){
+        //Register a websocket svc first
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, WEBSOCKET_ADMIN_URI, receiver->uri);
+        receiver->sockSvc.handle = receiver;
+        //Set callbacks to monitor any incoming connections (passive), data 
events or close events
+        receiver->sockSvc.ready = psa_websocketTopicReceiver_ready;
+        receiver->sockSvc.data = psa_websocketTopicReceiver_data;
+        receiver->sockSvc.close = psa_websocketTopicReceiver_close;
+        receiver->svcId = celix_bundleContext_registerService(receiver->ctx, 
&receiver->sockSvc,
+                                                           
WEBSOCKET_ADMIN_SERVICE_NAME, props);
+    }
 
     const char *staticConnects = celix_properties_get(topicProperties, 
PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
     if (staticConnects != NULL) {
@@ -198,7 +215,7 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
                 entry->socketPort = sockPort;
                 entry->connected = false;
                 entry->statically = true;
-                entry->recvBuffer = &receiver->recvBuffer;
+                entry->passive = false;
                 hashMap_put(receiver->requestedConnections.map, (void *) 
entry->key, entry);
             } else {
                 L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s", 
addr);
@@ -239,6 +256,8 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
 
         celix_bundleContext_stopTracker(receiver->ctx, 
receiver->subscriberTrackerId);
 
+        celix_bundleContext_unregisterService(receiver->ctx, receiver->svcId);
+
         celixThreadMutex_lock(&receiver->subscribers.mutex);
         hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
         while (hashMapIterator_hasNext(&iter)) {
@@ -298,6 +317,9 @@ const char* 
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_
 const char* 
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t 
*receiver) {
     return receiver->topic;
 }
+const char* 
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver) {
+    return receiver->uri;
+}
 
 long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver) {
     return receiver->serializerSvcId;
@@ -309,7 +331,7 @@ void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiv
     while (hashMapIterator_hasNext(&iter)) {
         psa_websocket_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
         char *url = NULL;
-        asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" : 
"");
+        asprintf(&url, "%s:%li%s%s", entry->socketAddress, entry->socketPort, 
entry->statically ? " (static)" : "", entry->passive ? " (passive)" : "");
         if (entry->connected) {
             celix_arrayList_add(connectedUrls, url);
         } else {
@@ -320,8 +342,8 @@ void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiv
 }
 
 
-void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort, const char *uri) {
-    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri 
%s", receiver->scope, receiver->topic, uri);
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s ('%s') connecting to 
websocket address %s:li", receiver->scope, receiver->topic, receiver->uri, 
socketAddress, socketPort);
 
     char *key = NULL;
     asprintf(&key, "%s:%li", socketAddress, socketPort);
@@ -331,28 +353,33 @@ void 
pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *
     if (entry == NULL) {
         entry = calloc(1, sizeof(*entry));
         entry->key = key;
-        entry->uri = strndup(uri, 1024 * 1024);
+        entry->uri = strndup(receiver->uri, 1024 * 1024);
         entry->socketAddress = strndup(socketAddress, 1024 * 1024);
         entry->socketPort = socketPort;
         entry->connected = false;
         entry->statically = false;
-        entry->recvBuffer = &receiver->recvBuffer;
-        hashMap_put(receiver->requestedConnections.map, (void*)entry->uri, 
entry);
+        entry->passive = false;
+        hashMap_put(receiver->requestedConnections.map, (void*)entry->key, 
entry);
         receiver->requestedConnections.allConnected = false;
+    } else {
+        free(key);
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 
     psa_websocket_connectToAllRequestedConnections(receiver);
 }
 
-void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *uri) {
-    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri 
%s", receiver->scope, receiver->topic, uri);
+void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort) {
+    L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s ('%s') disconnect from 
websocket address %s:%li", receiver->scope, receiver->topic, receiver->uri, 
socketAddress, socketPort);
+
+    char *key = NULL;
+    asprintf(&key, "%s:%li", socketAddress, socketPort);
 
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    psa_websocket_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, uri);
+
+    psa_websocket_requested_connection_entry_t *entry = 
hashMap_remove(receiver->requestedConnections.map, key);
     if (entry != NULL && entry->connected) {
         mg_close_connection(entry->sockConnection);
-        L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.", 
uri);
     }
     if (entry != NULL) {
         free(entry->socketAddress);
@@ -361,6 +388,7 @@ void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receive
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+    free(key);
 }
 
 static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void 
*svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
@@ -558,6 +586,39 @@ static void* psa_websocket_recvThread(void * data) {
     return NULL;
 }
 
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection, 
void *handle) {
+    if (handle != NULL) {
+        pubsub_websocket_topic_receiver_t *receiver = 
(pubsub_websocket_topic_receiver_t *) handle;
+
+        //Get request info with host, port and uri information
+        const struct mg_request_info *ri = mg_get_request_info(connection);
+        if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+            char *key = NULL;
+            asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+            celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+            psa_websocket_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, key);
+            if (entry == NULL) {
+                entry = calloc(1, sizeof(*entry));
+                entry->key = key;
+                entry->uri = strndup(ri->request_uri, 1024 * 1024);
+                entry->socketAddress = strndup(ri->remote_addr, 1024 * 1024);
+                entry->socketPort = ri->remote_port;
+                entry->connected = true;
+                entry->statically = false;
+                entry->passive = true;
+                hashMap_put(receiver->requestedConnections.map, (void *) 
entry->key, entry);
+                receiver->requestedConnections.allConnected = false;
+            } else {
+                free(key);
+            }
+
+            celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+        }
+    }
+}
+
+
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection 
__attribute__((unused)),
                                             int op_code 
__attribute__((unused)),
                                             char *data,
@@ -565,27 +626,48 @@ static int psa_websocketTopicReceiver_data(struct 
mg_connection *connection __at
                                             void *handle) {
     //Received a websocket message, append this message to the buffer of the 
receiver.
     if (handle != NULL) {
-        psa_websocket_requested_connection_entry_t *entry = 
(psa_websocket_requested_connection_entry_t *) handle;
+        pubsub_websocket_topic_receiver_t *receiver = 
(pubsub_websocket_topic_receiver_t *) handle;
 
-        celixThreadMutex_lock(&entry->recvBuffer->mutex);
+        celixThreadMutex_lock(&receiver->recvBuffer.mutex);
         pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
         const char *rcvdMsgData = malloc(length);
         memcpy((void *) rcvdMsgData, data, length);
         msg->msgData = rcvdMsgData;
         msg->msgSize = length;
-        celix_arrayList_add(entry->recvBuffer->list, msg);
-        celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+        celix_arrayList_add(receiver->recvBuffer.list, msg);
+        celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
     }
 
     return 1; //keep open (non-zero), 0 to close the socket
 }
 
-static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection __attribute__((unused)), void *handle) {
+static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection, void *handle) {
     //Reset connection for this receiver entry
     if (handle != NULL) {
-        psa_websocket_requested_connection_entry_t *entry = 
(psa_websocket_requested_connection_entry_t *) handle;
-        entry->connected = false;
-        entry->sockConnection = NULL;
+        pubsub_websocket_topic_receiver_t *receiver = 
(pubsub_websocket_topic_receiver_t *) handle;
+
+        //Get request info with host, port and uri information
+        const struct mg_request_info *ri = mg_get_request_info(connection);
+        if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+            char *key = NULL;
+            asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+            celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+            psa_websocket_requested_connection_entry_t *entry = 
hashMap_get(receiver->requestedConnections.map, key);
+            if (entry != NULL) {
+                entry->connected = false;
+                entry->sockConnection = NULL;
+                if(entry->passive) {
+                    hashMap_remove(receiver->requestedConnections.map, key);
+                    free(entry->key);
+                    free(entry->uri);
+                    free(entry->socketAddress);
+                    free(entry);
+                }
+            }
+            celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+            free(key);
+        }
     }
 }
 
@@ -597,7 +679,7 @@ static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
         hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_requested_connection_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-            if (!entry->connected) {
+            if (!entry->connected && !entry->passive) {
                 char errBuf[100] = {0};
                 entry->sockConnection = 
mg_connect_websocket_client(entry->socketAddress,
                                                                     (int) 
entry->socketPort,
@@ -608,7 +690,7 @@ static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
                                                                     NULL,
                                                                     
psa_websocketTopicReceiver_data,
                                                                     
psa_websocketTopicReceiver_close,
-                                                                    entry);
+                                                                    receiver);
                 if(entry->sockConnection != NULL) {
                     entry->connected = true;
                     entry->connectRetryCount = 0;
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
index be1cff7..4f100c4 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
+++ 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
@@ -36,12 +36,13 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
 
 const char* 
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t 
*receiver);
 const char* 
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t 
*receiver);
+const char* 
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver);
 
 long 
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t 
*receiver);
 void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls);
 
-void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort, const char *uri);
-void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *uri);
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort);
+void 
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t 
*receiver, const char *socketAddress, long socketPort);
 
 
 pubsub_admin_receiver_metrics_t* 
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t 
*receiver);

Reply via email to