This is an automated email from the ASF dual-hosted git repository.
rlenferink pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new c4175a4 Updated PSA websocket with full duplex communication
c4175a4 is described below
commit c4175a4050b4a8f6942519fa013e25296dbb0434
Author: dhbfischer <[email protected]>
AuthorDate: Sun Feb 2 14:55:09 2020 +0100
Updated PSA websocket with full duplex communication
---
bundles/http_admin/http_admin/src/activator.c | 1 -
bundles/http_admin/http_admin/src/service_tree.c | 3 +-
.../http_admin/http_admin/src/websocket_admin.c | 3 +-
bundles/pubsub/examples/CMakeLists.txt | 6 +-
bundles/pubsub/examples/pubsub/CMakeLists.txt | 2 +-
.../index.html => common/include/poiCmd.h} | 47 +++--
.../pubsub/msg_descriptors/msg_poiCmd.descriptor | 8 +
.../poiCmd.properties} | 20 +-
.../CMakeLists.txt | 36 ++--
.../private/include/pubsub_websocket_private.h | 97 ++++++++++
.../private/src/ps_websocket_activator.c | 164 ++++++++++++++++
.../private/src/pubsub_websocket_example.c | 207 +++++++++++++++++++++
.../resources/index.html | 20 ++
.../resources/script.js | 24 ++-
.../src/pubsub_websocket_admin.c | 31 ++-
.../src/pubsub_websocket_topic_receiver.c | 126 ++++++++++---
.../src/pubsub_websocket_topic_receiver.h | 5 +-
17 files changed, 710 insertions(+), 90 deletions(-)
diff --git a/bundles/http_admin/http_admin/src/activator.c
b/bundles/http_admin/http_admin/src/activator.c
index b388d99..4578ada 100644
--- a/bundles/http_admin/http_admin/src/activator.c
+++ b/bundles/http_admin/http_admin/src/activator.c
@@ -54,7 +54,6 @@ static int http_admin_start(http_admin_activator_t *act,
celix_bundle_context_t
celix_bundle_t *bundle = celix_bundleContext_getBundle(ctx);
char* root = celix_bundle_getEntry(bundle, "root");
-
bool prop_use_websockets = celix_bundleContext_getPropertyAsBool(ctx,
HTTP_ADMIN_USE_WEBSOCKETS_KEY, HTTP_ADMIN_USE_WEBSOCKETS_DFT);
long listPort = celix_bundleContext_getPropertyAsLong(ctx,
HTTP_ADMIN_LISTENING_PORTS_KEY, HTTP_ADMIN_LISTENING_PORTS_DFT);
long websocketTimeoutMs = celix_bundleContext_getPropertyAsLong(ctx,
HTTP_ADMIN_WEBSOCKET_TIMEOUT_MS_KEY, HTTP_ADMIN_WEBSOCKET_TIMEOUT_MS_DFT);
diff --git a/bundles/http_admin/http_admin/src/service_tree.c
b/bundles/http_admin/http_admin/src/service_tree.c
index 46aa4e1..1066708 100644
--- a/bundles/http_admin/http_admin/src/service_tree.c
+++ b/bundles/http_admin/http_admin/src/service_tree.c
@@ -140,6 +140,7 @@ bool addServiceNode(service_tree_t *svc_tree, const char
*uri, void *svc) {
svc_tree->tree_node_count++;
uri_exists = false;
}
+ subNodeCount++;
} else if (current->next != NULL) {
current = current->next;
current_data = current->svc_data;
@@ -152,8 +153,8 @@ bool addServiceNode(service_tree_t *svc_tree, const char
*uri, void *svc) {
current_data = node->svc_data;
svc_tree->tree_node_count++;
uri_exists = false;
+ subNodeCount++;
}
- subNodeCount++;
}
//Increment tree service count if URI exists (only one service can be
added at once)
diff --git a/bundles/http_admin/http_admin/src/websocket_admin.c
b/bundles/http_admin/http_admin/src/websocket_admin.c
index f6e9ce0..b81c62b 100644
--- a/bundles/http_admin/http_admin/src/websocket_admin.c
+++ b/bundles/http_admin/http_admin/src/websocket_admin.c
@@ -92,7 +92,8 @@ void websocket_admin_addWebsocketService(void *handle, void
*svc, const celix_pr
if(uri != NULL) {
celixThreadMutex_lock(&(admin->admin_lock));
if(addServiceNode(&admin->sock_svc_tree, uri, websockSvc)) {
- mg_set_websocket_handler(admin->mg_ctx, uri,
websocket_connect_handler, websocket_ready_handler, websocket_data_handler,
websocket_close_handler, admin);
+ mg_set_websocket_handler(admin->mg_ctx, uri,
websocket_connect_handler, websocket_ready_handler,
+ websocket_data_handler,
websocket_close_handler, admin);
} else {
printf("Websocket service with URI %s already exists!\n", uri);
}
diff --git a/bundles/pubsub/examples/CMakeLists.txt
b/bundles/pubsub/examples/CMakeLists.txt
index a36e87d..b19e327 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -392,7 +392,7 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
endif()
-add_celix_container(pubsub_publisher_websocket
+add_celix_container(pubsub_websocket_example
GROUP pubsub
BUNDLES
Celix::log_service
@@ -402,7 +402,7 @@ add_celix_container(pubsub_publisher_websocket
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
Celix::pubsub_admin_websocket
- celix_pubsub_websocket_publisher
+ celix_pubsub_websocket_example
PROPERTIES
PSA_TCP_VERBOSE=true
PUBSUB_ETCD_DISCOVERY_VERBOSE=true
@@ -410,4 +410,4 @@ add_celix_container(pubsub_publisher_websocket
CELIX_HTTP_ADMIN_LISTENING_PORTS=7660
CELIX_HTTP_ADMIN_NUM_THREADS=5
)
-target_link_libraries(pubsub_publisher_websocket PRIVATE
${PUBSUB_CONTAINER_LIBS})
+target_link_libraries(pubsub_websocket_example PRIVATE
${PUBSUB_CONTAINER_LIBS})
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 5bfb06f..6348662 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -19,7 +19,7 @@ include_directories("common/include")
add_subdirectory(publisher)
add_subdirectory(publisher2)
-add_subdirectory(publisher_websocket)
+add_subdirectory(pubsub_websocket)
add_subdirectory(subscriber)
diff --git
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
b/bundles/pubsub/examples/pubsub/common/include/poiCmd.h
similarity index 64%
copy from
bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
copy to bundles/pubsub/examples/pubsub/common/include/poiCmd.h
index e1ecb3d..1c9c0a3 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
+++ b/bundles/pubsub/examples/pubsub/common/include/poiCmd.h
@@ -1,4 +1,4 @@
-<!--
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,26 +15,25 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
--->
-<!DOCTYPE html>
-<html lang="en">
-<head>
- <meta charset="utf-8"/>
- <title>Apache Celix Websocket publisher example</title>
- <script src="script.js"></script>
-</head>
-<body>
- <div>
- <h1>Poi1 message:</h1>
- <br/>
- <p id="receivePoi1"></p>
- </div>
- <br/>
- <div>
- <h1>Poi2 message:</h1>
- <br/>
- <p id="receivePoi2"></p>
- </div>
- <script>docReady();</script>
-</body>
-</html>
+ */
+/**
+ * poiCmd.h
+ *
+ * \date Jan 16, 2020
+ * \author <a href="mailto:[email protected]">Apache Celix Project
Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef POICMD_H_
+#define POICMD_H_
+
+#define MSG_POI_CMD_NAME "poiCmd" //Has to match the message name in the msg
descriptor!
+
+struct poi_cmd{
+ char* command;
+};
+
+typedef struct poi_cmd poi_cmd_t;
+
+
+#endif /* POICMD_H_ */
diff --git
a/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
new file mode 100644
index 0000000..f4f263c
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
@@ -0,0 +1,8 @@
+:header
+type=message
+name=poiCmd
+version=1.0.0
+:annotations
+classname=org.example.PointOfInterestCommand
+:message
+{t command}
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
similarity index 68%
copy from bundles/pubsub/examples/pubsub/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
index 5bfb06f..ffecb9d 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
@@ -15,11 +15,21 @@
# specific language governing permissions and limitations
# under the License.
-include_directories("common/include")
+#
+# included in the bundle at location META-INF/topics/[pub|sub]/poi2.properties
+#
-add_subdirectory(publisher)
-add_subdirectory(publisher2)
-add_subdirectory(publisher_websocket)
-add_subdirectory(subscriber)
+#topic info
+topic.name=poiCmd
+topic.id=poiCmd
+#Interface info
+interface.name=org.example.unknown
+interface.version=1.0.0
+interface.messages=poiCmd
+# Version info
+interface.message.consumer.range@poiCmd=[0.0.0,1.0.0)
+interface.message.provider.version@poiCmd=0.0.0
+
+qos=control
diff --git a/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
similarity index 53%
rename from bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
index 3e407fa..59f81cf 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/CMakeLists.txt
@@ -19,41 +19,45 @@
find_package(CURL REQUIRED)
-add_celix_bundle(celix_pubsub_websocket_publisher
- SYMBOLIC_NAME "apache_celix_pubsub_websocket_publisher"
+add_celix_bundle(celix_pubsub_websocket_example
+ SYMBOLIC_NAME "apache_celix_pubsub_websocket_example"
VERSION "1.0.0"
- SOURCES
- ../publisher/private/src/ps_pub_activator.c
- ../publisher/private/src/pubsub_publisher.c
+ SOURCES
+ private/src/ps_websocket_activator.c
+ private/src/pubsub_websocket_example.c
)
-target_link_libraries(celix_pubsub_websocket_publisher PRIVATE
Celix::framework Celix::pubsub_api ${CURL_LIBRARIES})
-target_include_directories(celix_pubsub_websocket_publisher PRIVATE
../publisher/private/include)
+target_link_libraries(celix_pubsub_websocket_example PRIVATE Celix::framework
Celix::pubsub_api ${CURL_LIBRARIES})
+target_include_directories(celix_pubsub_websocket_example PRIVATE
private/include)
-
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
+
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poiCmd.descriptor
DESTINATION "META-INF/descriptors"
)
-celix_bundle_files(celix_pubsub_websocket_publisher
-
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
-
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
+celix_bundle_files(celix_pubsub_websocket_example
+
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
+
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
DESTINATION "META-INF/topics/pub"
)
+celix_bundle_files(celix_pubsub_websocket_example
+
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poiCmd.properties
+ DESTINATION "META-INF/topics/sub"
+)
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
DESTINATION "META-INF/keys"
)
-celix_bundle_files(celix_pubsub_websocket_publisher
+celix_bundle_files(celix_pubsub_websocket_example
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
DESTINATION "META-INF/keys/subscriber"
)
-celix_bundle_add_dir(celix_pubsub_websocket_publisher resources DESTINATION
".")
+celix_bundle_add_dir(celix_pubsub_websocket_example resources DESTINATION ".")
-celix_bundle_headers(celix_pubsub_websocket_publisher
+celix_bundle_headers(celix_pubsub_websocket_example
"X-Web-Resource: /example$<SEMICOLON>/resources"
)
diff --git
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
new file mode 100644
index 0000000..938b6f4
--- /dev/null
+++
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/include/pubsub_websocket_private.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_websocket_private.h
+ *
+ * \date Jan 16, 2020
+ * \author <a href="mailto:[email protected]">Apache Celix Project
Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_WEBSOCKET_PRIVATE_H_
+#define PUBSUB_WEBSOCKET_PRIVATE_H_
+
+
+#include <string.h>
+
+#include "celix_api.h"
+
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+
+struct pubsub_sender{
+ array_list_t *trackers;
+ const char *ident;
+ hash_map_t *tid_map; //service -> tid
+ long bundleId;
+ bool stop;
+};
+typedef struct pubsub_sender pubsub_sender_t;
+
+struct pubsub_receiver {
+ char *name;
+};
+
+typedef struct pubsub_receiver pubsub_receiver_t;
+
+
+/**
+ * Shared Publisher and Subscriber types
+ *
+ */
+struct pubsub_info {
+ pubsub_sender_t *sender;
+ bool sending;
+ pubsub_receiver_t *receiver;
+};
+
+typedef struct pubsub_info pubsub_info_t;
+
+/**
+ * Publisher
+ */
+struct send_thread_struct {
+ pubsub_publisher_t *service;
+ pubsub_info_t *pubsub;
+ const char *topic;
+};
+typedef struct send_thread_struct send_thread_struct_t;
+
+pubsub_sender_t* publisher_create(array_list_pt trackers, const char*
ident,long bundleId);
+
+void publisher_start(pubsub_sender_t *client);
+void publisher_stop(pubsub_sender_t *client);
+
+void publisher_destroy(pubsub_sender_t *client);
+
+void publisher_publishSvcAdded(void * handle, void *svc, const
celix_properties_t *props);
+void publisher_publishSvcRemoved(void * handle, void *svc, const
celix_properties_t *props);
+
+/**
+ * Subscriber
+ */
+pubsub_receiver_t* subscriber_create(char* topics);
+void subscriber_start(pubsub_receiver_t* client);
+void subscriber_stop(pubsub_receiver_t* client);
+void subscriber_destroy(pubsub_receiver_t* client);
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int
msgTypeId, void* msg, bool* release);
+
+
+#endif /* PUBSUB_WEBSOCKET_PRIVATE_H_ */
diff --git
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
new file mode 100644
index 0000000..f5c5114
--- /dev/null
+++
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/ps_websocket_activator.c
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * ps_websocket_activator.c
+ *
+ * \date Jan 16, 2020
+ * \author <a href="mailto:[email protected]">Apache Celix Project
Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <pubsub/subscriber.h>
+#include <pubsub/publisher.h>
+
+#include "celix_api.h"
+
+#include "pubsub_websocket_private.h"
+
+
+static const char * PUB_TOPICS[] = {
+ "poi1",
+ "poi2",
+ NULL
+};
+
+
+#define SUB_NAME "poiCmd"
+static const char * SUB_TOPICS[] = {
+ "poiCmd",
+ NULL
+};
+
+struct ps_websocketActivator {
+ pubsub_info_t *pubsub;
+ //Publisher vars
+ array_list_t *trackerList;//List<service_tracker_pt>
+ //Subscriber vars
+ array_list_t *registrationList; //List<service_registration_pt>
+ pubsub_subscriber_t *subsvc;
+};
+
+static int pubsub_start(struct ps_websocketActivator *act,
celix_bundle_context_t *ctx) {
+ const char *fwUUID =
celix_bundleContext_getProperty(ctx,OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+ if (fwUUID == NULL) {
+ printf("PUBLISHER: Cannot retrieve fwUUID.\n");
+ return CELIX_INVALID_BUNDLE_CONTEXT;
+ }
+
+ celix_bundle_t *bnd = celix_bundleContext_getBundle(ctx);
+ long bundleId = celix_bundle_getId(bnd);
+
+ act->pubsub = calloc(1, sizeof(*act->pubsub));
+
+ //Publisher
+ act->trackerList = celix_arrayList_create();
+ act->pubsub->sender = publisher_create(act->trackerList, fwUUID, bundleId);
+
+ int i;
+ char filter[128];
+ for (i = 0; PUB_TOPICS[i] != NULL; i++) {
+ const char* topic = PUB_TOPICS[i];
+ memset(filter,0,128);
+#ifdef USE_SCOPE
+ char *scope;
+ asprintf(&scope, "my_scope_%d", i);
+ snprintf(filter, 128, "(%s=%s)(%s=%s)", PUBSUB_PUBLISHER_TOPIC, topic,
PUBSUB_PUBLISHER_SCOPE, scope);
+ free(scope);
+#else
+ snprintf(filter, 128, "(%s=%s)", (char*) PUBSUB_PUBLISHER_TOPIC,
topic);
+#endif
+ celix_service_tracking_options_t opts =
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
+ opts.callbackHandle = act->pubsub;
+ opts.addWithProperties = publisher_publishSvcAdded;
+ opts.removeWithProperties = publisher_publishSvcRemoved;
+ opts.filter.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+ opts.filter.filter = filter;
+ opts.filter.ignoreServiceLanguage = true;
+ long trackerId = celix_bundleContext_trackServicesWithOptions(ctx,
&opts);
+
+ celix_arrayList_addLong(act->trackerList,trackerId);
+ }
+
+ publisher_start(act->pubsub->sender);
+
+ //Subscriber
+ act->registrationList = celix_arrayList_create();
+
+ pubsub_subscriber_t *subsvc = calloc(1,sizeof(*subsvc));
+ act->pubsub->receiver = subscriber_create(SUB_NAME);
+ subsvc->handle = act->pubsub;
+ subsvc->receive = pubsub_subscriber_recv;
+
+ act->subsvc = subsvc;
+
+ for (i = 0; SUB_TOPICS[i] != NULL; i++) {
+ const char* topic = SUB_TOPICS[i];
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, topic);
+#ifdef USE_SCOPE
+ char *scope;
+ asprintf(&scope, "my_scope_%d", i);
+ celix_properties_set(props, PUBSUB_SUBSCRIBER_SCOPE, scope);
+ free(scope);
+#endif
+ service_registration_pt reg = NULL;
+ bundleContext_registerService(ctx, PUBSUB_SUBSCRIBER_SERVICE_NAME,
subsvc, props, ®);
+ arrayList_add(act->registrationList,reg);
+ }
+
+ subscriber_start((pubsub_receiver_t *) act->subsvc->handle);
+
+ return 0;
+}
+
+static int pubsub_stop(struct ps_websocketActivator *act,
celix_bundle_context_t *ctx) {
+ int i;
+
+ //Publishers
+ for (i = 0; i < celix_arrayList_size(act->trackerList); i++) {
+ long trkId = celix_arrayList_getLong(act->trackerList, i);
+ celix_bundleContext_stopTracker(ctx, trkId);
+ }
+ publisher_stop(act->pubsub->sender);
+
+ publisher_destroy(act->pubsub->sender);
+ celix_arrayList_destroy(act->trackerList);
+
+ //Subscriber
+ for (i = 0; i < celix_arrayList_size(act->registrationList); i++) {
+ service_registration_pt reg =
celix_arrayList_get(act->registrationList, i);
+ serviceRegistration_unregister(reg);
+ }
+
+ subscriber_stop((pubsub_receiver_t *) act->pubsub->receiver);
+
+ act->subsvc->receive = NULL;
+ subscriber_destroy((pubsub_receiver_t *) act->pubsub->receiver);
+ act->subsvc->handle = NULL;
+ free(act->subsvc);
+ act->subsvc = NULL;
+
+ celix_arrayList_destroy(act->registrationList);
+ free(act->pubsub);
+
+ return CELIX_SUCCESS;
+}
+
+CELIX_GEN_BUNDLE_ACTIVATOR(struct ps_websocketActivator, pubsub_start,
pubsub_stop)
\ No newline at end of file
diff --git
a/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
new file mode 100644
index 0000000..7b820f0
--- /dev/null
+++
b/bundles/pubsub/examples/pubsub/pubsub_websocket/private/src/pubsub_websocket_example.c
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * pubsub_subscriber.c
+ *
+ * \date Sep 21, 2010
+ * \author <a href="mailto:[email protected]">Apache Celix Project
Team</a>
+ * \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+
+#include "poi.h"
+#include "poiCmd.h"
+#include "pubsub_websocket_private.h"
+
+#include "service_tracker.h"
+#include "celix_threads.h"
+
+
+static double randCoordinate(double min, double max) {
+ double ret = min + (((double)random()) / (((double)RAND_MAX)/(max-min)));
+ return ret;
+}
+
+static void* send_thread(void* arg) {
+ send_thread_struct_t *st_struct = (send_thread_struct_t *) arg;
+
+ pubsub_publisher_t *publish_svc = st_struct->service;
+ pubsub_info_t *pubsubInfo = st_struct->pubsub;
+ pubsub_sender_t *publisher = st_struct->pubsub->sender;
+
+ char fwUUID[9];
+ memset(fwUUID, 0, 9);
+ memcpy(fwUUID, publisher->ident, 8);
+
+ location_t place = calloc(1, sizeof(*place));
+
+ char *desc = calloc(64, sizeof(char));
+ snprintf(desc, 64, "fw-%s [TID=%lu]", fwUUID, (unsigned
long)pthread_self());
+
+ char *name = calloc(64, sizeof(char));
+ snprintf(name, 64, "Bundle#%ld", publisher->bundleId);
+
+ place->name = name;
+ place->description = desc;
+ place->extra = "extra value";
+ printf("TOPIC : %s\n", st_struct->topic);
+
+ unsigned int msgId = 0;
+
+ while (publisher->stop == false) {
+ if(pubsubInfo->sending) {
+ if (msgId == 0) {
+ if (publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,
st_struct->topic, &msgId) != 0) {
+ printf("PUBLISHER: Cannot retrieve msgId for message
'%s'\n", MSG_POI_NAME);
+ }
+ }
+
+ if (msgId > 0) {
+ place->position.lat = randCoordinate(MIN_LAT, MAX_LAT);
+ place->position.lon = randCoordinate(MIN_LON, MAX_LON);
+ int nr_char = (int) randCoordinate(5, 100000);
+ place->data = calloc(nr_char, 1);
+ for (int i = 0; i < (nr_char - 1); i++) {
+ place->data[i] = i % 10 + '0';
+ }
+ place->data[nr_char - 1] = '\0';
+ if (publish_svc->send) {
+ if (publish_svc->send(publish_svc->handle, msgId, place)
== 0) {
+ printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",
st_struct->topic,
+ place->position.lat, place->position.lon,
place->name, place->description, nr_char);
+ }
+ } else {
+ printf("No send for %s\n", st_struct->topic);
+ }
+
+ free(place->data);
+ }
+ }
+ sleep(2);
+ }
+
+ free(place->description);
+ free(place->name);
+ free(place);
+
+ free(st_struct);
+
+ return NULL;
+}
+
+pubsub_sender_t* publisher_create(array_list_pt trackers,const char*
ident,long bundleId) {
+ pubsub_sender_t *publisher = malloc(sizeof(*publisher));
+
+ publisher->trackers = trackers;
+ publisher->ident = ident;
+ publisher->bundleId = bundleId;
+ publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL);
+ publisher->stop = false;
+
+ return publisher;
+}
+
+void publisher_start(pubsub_sender_t *client) {
+ printf("PUBLISHER: starting up...\n");
+}
+
+void publisher_stop(pubsub_sender_t *client) {
+ printf("PUBLISHER: stopping...\n");
+}
+
+void publisher_destroy(pubsub_sender_t *client) {
+ hashMap_destroy(client->tid_map, false, false);
+ client->trackers = NULL;
+ client->ident = NULL;
+ free(client);
+}
+
+void publisher_publishSvcAdded(void * handle, void *svc, const
celix_properties_t *props) {
+ pubsub_publisher_t *publish_svc = (pubsub_publisher_t *) svc;
+ pubsub_info_t *manager = (pubsub_info_t *) handle;
+ manager->sender->stop = false;
+
+ printf("PUBLISHER: new publish service exported (%s).\n",
manager->sender->ident);
+
+ send_thread_struct_t *data = calloc(1, sizeof(*data));
+ data->service = publish_svc;
+ data->pubsub = manager;
+ data->topic = celix_properties_get(props, PUBSUB_PUBLISHER_TOPIC,
"!ERROR!");
+ celix_thread_t *tid = malloc(sizeof(*tid));
+ celixThread_create(tid, NULL, send_thread, (void*)data);
+ hashMap_put(manager->sender->tid_map, publish_svc, tid);
+}
+
+void publisher_publishSvcRemoved(void * handle, void *svc, const
celix_properties_t *props) {
+ pubsub_info_t *manager = (pubsub_info_t *) handle;
+ celix_thread_t *tid = hashMap_get(manager->sender->tid_map, svc);
+ manager->sender->stop = true;
+
+#if defined(__APPLE__) && defined(__MACH__)
+ uint64_t threadid;
+ pthread_threadid_np(tid->thread, &threadid);
+ printf("PUBLISHER: publish service unexporting (%s)
%llu!\n",manager->ident, threadid);
+#else
+ printf("PUBLISHER: publish service unexporting (%s) %li!\n",
manager->sender->ident, tid->thread);
+#endif
+
+ celixThread_join(*tid,NULL);
+ free(tid);
+}
+
+pubsub_receiver_t* subscriber_create(char* topics) {
+ pubsub_receiver_t *sub = calloc(1,sizeof(*sub));
+ sub->name = strdup(topics);
+ return sub;
+}
+
+
+void subscriber_start(pubsub_receiver_t *subscriber) {
+ printf("Subscriber started...\n");
+}
+
+void subscriber_stop(pubsub_receiver_t *subscriber) {
+ printf("Subscriber stopped...\n");
+}
+
+void subscriber_destroy(pubsub_receiver_t *subscriber) {
+ if (subscriber->name != NULL) {
+ free(subscriber->name);
+ }
+ subscriber->name=NULL;
+ free(subscriber);
+}
+
+int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int
msgTypeId, void* msg, bool* release) {
+ poi_cmd_t *cmd = (poi_cmd_t *) msg;
+ pubsub_info_t *pubsub = (pubsub_info_t *) handle;
+ printf("Received command %s\n", cmd->command);
+
+ if(strcmp(cmd->command, "start") == 0) {
+ pubsub->sending = true;
+ } else {
+ pubsub->sending = false;
+ }
+ return 0;
+}
diff --git
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
similarity index 63%
rename from
bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
index e1ecb3d..37c0daf 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/index.html
@@ -35,6 +35,26 @@
<br/>
<p id="receivePoi2"></p>
</div>
+ <div>
+ <h1>Send start/stop message:</h1>
+ <br/>
+ <form onsubmit="return submitForm()">
+ <table>
+ <tr>
+ <td>
+ <p>Choose command: </p>
+ </td>
+ <td>
+ <select name="command" id="command">
+ <option value="start"
selected="selected">Start</option>
+ <option value="stop">Stop</option>
+ </select>
+ </td>
+ </tr>
+ </table>
+ <input type="submit" name="submit" class="button" id="button"
value="Send!">
+ </form>
+ </div>
<script>docReady();</script>
</body>
</html>
diff --git
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
similarity index 66%
rename from
bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
rename to bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
index 81026a4..503db7a 100644
--- a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
+++ b/bundles/pubsub/examples/pubsub/pubsub_websocket/resources/script.js
@@ -17,22 +17,38 @@
* under the License.
*/
+var seqnr = 0;
+var sendSocketP1;
+
function docReady() {
var host = window.location.host;
- var shellSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
- var shellSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+ var rcvSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
+ var rcvSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+ sendSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poiCmd");
- shellSocketP1.onmessage = function (event) {
+ rcvSocketP1.onmessage = function (event) {
console.log(event);
var obj = JSON.parse(event.data);
document.getElementById("receivePoi1").innerHTML = "Received " +
obj.id + " message with sequence nr: "
+ obj.seqNr + "<br/>latitude = " +
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " +
JSON.stringify(obj.data.location.lon);
};
- shellSocketP2.onmessage = function (event) {
+ rcvSocketP2.onmessage = function (event) {
console.log(event);
var obj = JSON.parse(event.data);
document.getElementById("receivePoi2").innerHTML = "Received " +
obj.id + " message with sequence nr: "
+ obj.seqNr + "<br/>latitude = " +
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " +
JSON.stringify(obj.data.location.lon);
};
}
+
+function submitForm() {
+ var element = document.getElementById("command");
+ var value = element.options[element.selectedIndex].value;
+ let msg = "{\"id\":\"poiCmd\", \"major\": 1, \"minor\": 0, \"seqNr\": " +
seqnr++ + ", \"data\": {" +
+ "\"command\": \"" + value + "\" }}";
+
+ console.log("Sending message: " + msg);
+ sendSocketP1.send(msg);
+ alert("sending message: " + msg);
+ return false;
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
index 0a34dc1..e0a4bb0 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_admin.c
@@ -455,8 +455,7 @@ static celix_status_t
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_web
if (eScope != NULL && eTopic != NULL &&
strncmp(eScope, scope, 1024 * 1024) == 0 &&
strncmp(eTopic, topic, 1024 * 1024) == 0) {
- char *uri = psa_websocket_createURI(eScope, eTopic);
- pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress,
sockPort, uri);
+ pubsub_websocketTopicReceiver_connectTo(receiver, sockAddress,
sockPort);
}
}
@@ -496,13 +495,24 @@ static celix_status_t
pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsu
const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+ const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE,
NULL);
const char *eScope = celix_properties_get(endpoint,
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
const char *eTopic = celix_properties_get(endpoint,
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+ const char *sockAddress = celix_properties_get(endpoint,
PUBSUB_WEBSOCKET_ADDRESS_KEY, NULL);
+ long sockPort = celix_properties_getAsLong(endpoint,
PUBSUB_WEBSOCKET_PORT_KEY, -1L);
- if (eScope != NULL && eTopic != NULL &&
+ bool publisher = type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE,
type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
+
+ if (publisher && (sockAddress == NULL || sockPort < 0)) {
+ L_WARN("[PSA WEBSOCKET] Error got endpoint without websocket
address/port or endpoint type. Properties:");
+ const char *key = NULL;
+ CELIX_PROPERTIES_FOR_EACH(endpoint, key) {
+ L_WARN("[PSA WEBSOCKET] |- %s=%s\n", key,
celix_properties_get(endpoint, key, NULL));
+ }
+ status = CELIX_BUNDLE_EXCEPTION;
+ } else if(eScope != NULL && eTopic != NULL &&
strncmp(eScope, scope, 1024 * 1024) == 0 && strncmp(eTopic, topic,
1024 * 1024) == 0) {
- char *uri = psa_websocket_createURI(eScope, eTopic);
- pubsub_websocketTopicReceiver_disconnectFrom(receiver, uri);
+ pubsub_websocketTopicReceiver_disconnectFrom(receiver, sockAddress,
sockPort);
}
@@ -554,10 +564,9 @@ celix_status_t pubsub_websocketAdmin_executeCommand(void
*handle, char *commandL
const char *scope = pubsub_websocketTopicSender_scope(sender);
const char *topic = pubsub_websocketTopicSender_topic(sender);
const char *url = pubsub_websocketTopicSender_url(sender);
-// const char *postUrl = pubsub_websocketTopicSender_isStatic(sender) ?
" (static)" : "";
fprintf(out, "|- Topic Sender %s/%s\n", scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
- fprintf(out, " |- url = %s\n", url);
+ fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
@@ -574,21 +583,23 @@ celix_status_t pubsub_websocketAdmin_executeCommand(void
*handle, char *commandL
const char *serType = serEntry == NULL ? "!Error!" : serEntry->serType;
const char *scope = pubsub_websocketTopicReceiver_scope(receiver);
const char *topic = pubsub_websocketTopicReceiver_topic(receiver);
+ const char *urlEndp = pubsub_websocketTopicReceiver_url(receiver);
celix_array_list_t *connected = celix_arrayList_create();
celix_array_list_t *unconnected = celix_arrayList_create();
pubsub_websocketTopicReceiver_listConnections(receiver, connected,
unconnected);
fprintf(out, "|- Topic Receiver %s/%s\n", scope, topic);
- fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- serializer type = %s\n", serType);
+ fprintf(out, " |- url = %s\n", urlEndp);
for (int i = 0; i < celix_arrayList_size(connected); ++i) {
char *url = celix_arrayList_get(connected, i);
- fprintf(out, " |- connected url = %s\n", url);
+ fprintf(out, " |- connected endpoint = %s\n", url);
free(url);
}
for (int i = 0; i < celix_arrayList_size(unconnected); ++i) {
char *url = celix_arrayList_get(unconnected, i);
- fprintf(out, " |- unconnected url = %s\n", url);
+ fprintf(out, " |- unconnected endpoint = %s\n", url);
free(url);
}
celix_arrayList_destroy(connected);
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index babd9ce..b6dfa71 100644
---
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -70,6 +70,9 @@ struct pubsub_websocket_topic_receiver {
char scopeAndTopicFilter[5];
char *uri;
+ celix_websocket_service_t sockSvc;
+ long svcId;
+
pubsub_websocket_rcv_buffer_t recvBuffer;
struct {
@@ -93,7 +96,6 @@ struct pubsub_websocket_topic_receiver {
};
typedef struct psa_websocket_requested_connection_entry {
- pubsub_websocket_rcv_buffer_t *recvBuffer;
char *key; //host:port
char *socketAddress;
long socketPort;
@@ -102,6 +104,7 @@ typedef struct psa_websocket_requested_connection_entry {
int connectRetryCount;
bool connected;
bool statically; //true if the connection is statically configured through
the topic properties.
+ bool passive; //true if the connection is initiated by another resource
(e.g. webpage)
} psa_websocket_requested_connection_entry_t;
typedef struct psa_websocket_subscriber_entry {
@@ -119,6 +122,7 @@ static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
static void
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t
*receiver);
static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t
*msg_type_id_map);
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection,
void *handle);
static int psa_websocketTopicReceiver_data(struct mg_connection *connection,
int op_code, char *data, size_t length, void *handle);
static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection, void *handle);
@@ -168,6 +172,19 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
receiver->subscriberTrackerId =
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
+ //Register a websocket endpoint for this topic receiver
+ if(receiver->uri != NULL){
+ //Register a websocket svc first
+ celix_properties_t *props = celix_properties_create();
+ celix_properties_set(props, WEBSOCKET_ADMIN_URI, receiver->uri);
+ receiver->sockSvc.handle = receiver;
+ //Set callbacks to monitor any incoming connections (passive), data
events or close events
+ receiver->sockSvc.ready = psa_websocketTopicReceiver_ready;
+ receiver->sockSvc.data = psa_websocketTopicReceiver_data;
+ receiver->sockSvc.close = psa_websocketTopicReceiver_close;
+ receiver->svcId = celix_bundleContext_registerService(receiver->ctx,
&receiver->sockSvc,
+
WEBSOCKET_ADMIN_SERVICE_NAME, props);
+ }
const char *staticConnects = celix_properties_get(topicProperties,
PUBSUB_WEBSOCKET_STATIC_CONNECT_SOCKET_ADDRESSES, NULL);
if (staticConnects != NULL) {
@@ -198,7 +215,7 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
entry->socketPort = sockPort;
entry->connected = false;
entry->statically = true;
- entry->recvBuffer = &receiver->recvBuffer;
+ entry->passive = false;
hashMap_put(receiver->requestedConnections.map, (void *)
entry->key, entry);
} else {
L_WARN("[PSA_WEBSOCKET_TR] Invalid static socket address %s",
addr);
@@ -239,6 +256,8 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
celix_bundleContext_stopTracker(receiver->ctx,
receiver->subscriberTrackerId);
+ celix_bundleContext_unregisterService(receiver->ctx, receiver->svcId);
+
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -298,6 +317,9 @@ const char*
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_
const char*
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t
*receiver) {
return receiver->topic;
}
+const char*
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver) {
+ return receiver->uri;
+}
long
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t
*receiver) {
return receiver->serializerSvcId;
@@ -309,7 +331,7 @@ void
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiv
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_requested_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
char *url = NULL;
- asprintf(&url, "%s%s", entry->uri, entry->statically ? " (static)" :
"");
+ asprintf(&url, "%s:%li%s%s", entry->socketAddress, entry->socketPort,
entry->statically ? " (static)" : "", entry->passive ? " (passive)" : "");
if (entry->connected) {
celix_arrayList_add(connectedUrls, url);
} else {
@@ -320,8 +342,8 @@ void
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiv
}
-void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort, const char *uri) {
- L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s connecting to websocket uri
%s", receiver->scope, receiver->topic, uri);
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort) {
+ L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s ('%s') connecting to
websocket address %s:li", receiver->scope, receiver->topic, receiver->uri,
socketAddress, socketPort);
char *key = NULL;
asprintf(&key, "%s:%li", socketAddress, socketPort);
@@ -331,28 +353,33 @@ void
pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t *
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->key = key;
- entry->uri = strndup(uri, 1024 * 1024);
+ entry->uri = strndup(receiver->uri, 1024 * 1024);
entry->socketAddress = strndup(socketAddress, 1024 * 1024);
entry->socketPort = socketPort;
entry->connected = false;
entry->statically = false;
- entry->recvBuffer = &receiver->recvBuffer;
- hashMap_put(receiver->requestedConnections.map, (void*)entry->uri,
entry);
+ entry->passive = false;
+ hashMap_put(receiver->requestedConnections.map, (void*)entry->key,
entry);
receiver->requestedConnections.allConnected = false;
+ } else {
+ free(key);
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
psa_websocket_connectToAllRequestedConnections(receiver);
}
-void
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t
*receiver, const char *uri) {
- L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s disconnect from websocket uri
%s", receiver->scope, receiver->topic, uri);
+void
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort) {
+ L_DEBUG("[PSA_WEBSOCKET] TopicReceiver %s/%s ('%s') disconnect from
websocket address %s:%li", receiver->scope, receiver->topic, receiver->uri,
socketAddress, socketPort);
+
+ char *key = NULL;
+ asprintf(&key, "%s:%li", socketAddress, socketPort);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- psa_websocket_requested_connection_entry_t *entry =
hashMap_remove(receiver->requestedConnections.map, uri);
+
+ psa_websocket_requested_connection_entry_t *entry =
hashMap_remove(receiver->requestedConnections.map, key);
if (entry != NULL && entry->connected) {
mg_close_connection(entry->sockConnection);
- L_WARN("[PSA_WEBSOCKET] Error disconnecting from websocket uri %s.",
uri);
}
if (entry != NULL) {
free(entry->socketAddress);
@@ -361,6 +388,7 @@ void
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receive
free(entry);
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+ free(key);
}
static void pubsub_websocketTopicReceiver_addSubscriber(void *handle, void
*svc, const celix_properties_t *props, const celix_bundle_t *bnd) {
@@ -558,6 +586,39 @@ static void* psa_websocket_recvThread(void * data) {
return NULL;
}
+static void psa_websocketTopicReceiver_ready(struct mg_connection *connection,
void *handle) {
+ if (handle != NULL) {
+ pubsub_websocket_topic_receiver_t *receiver =
(pubsub_websocket_topic_receiver_t *) handle;
+
+ //Get request info with host, port and uri information
+ const struct mg_request_info *ri = mg_get_request_info(connection);
+ if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+ char *key = NULL;
+ asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_websocket_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, key);
+ if (entry == NULL) {
+ entry = calloc(1, sizeof(*entry));
+ entry->key = key;
+ entry->uri = strndup(ri->request_uri, 1024 * 1024);
+ entry->socketAddress = strndup(ri->remote_addr, 1024 * 1024);
+ entry->socketPort = ri->remote_port;
+ entry->connected = true;
+ entry->statically = false;
+ entry->passive = true;
+ hashMap_put(receiver->requestedConnections.map, (void *)
entry->key, entry);
+ receiver->requestedConnections.allConnected = false;
+ } else {
+ free(key);
+ }
+
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+ }
+ }
+}
+
+
static int psa_websocketTopicReceiver_data(struct mg_connection *connection
__attribute__((unused)),
int op_code
__attribute__((unused)),
char *data,
@@ -565,27 +626,48 @@ static int psa_websocketTopicReceiver_data(struct
mg_connection *connection __at
void *handle) {
//Received a websocket message, append this message to the buffer of the
receiver.
if (handle != NULL) {
- psa_websocket_requested_connection_entry_t *entry =
(psa_websocket_requested_connection_entry_t *) handle;
+ pubsub_websocket_topic_receiver_t *receiver =
(pubsub_websocket_topic_receiver_t *) handle;
- celixThreadMutex_lock(&entry->recvBuffer->mutex);
+ celixThreadMutex_lock(&receiver->recvBuffer.mutex);
pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
const char *rcvdMsgData = malloc(length);
memcpy((void *) rcvdMsgData, data, length);
msg->msgData = rcvdMsgData;
msg->msgSize = length;
- celix_arrayList_add(entry->recvBuffer->list, msg);
- celixThreadMutex_unlock(&entry->recvBuffer->mutex);
+ celix_arrayList_add(receiver->recvBuffer.list, msg);
+ celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
}
return 1; //keep open (non-zero), 0 to close the socket
}
-static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection __attribute__((unused)), void *handle) {
+static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection, void *handle) {
//Reset connection for this receiver entry
if (handle != NULL) {
- psa_websocket_requested_connection_entry_t *entry =
(psa_websocket_requested_connection_entry_t *) handle;
- entry->connected = false;
- entry->sockConnection = NULL;
+ pubsub_websocket_topic_receiver_t *receiver =
(pubsub_websocket_topic_receiver_t *) handle;
+
+ //Get request info with host, port and uri information
+ const struct mg_request_info *ri = mg_get_request_info(connection);
+ if (ri != NULL && strcmp(receiver->uri, ri->request_uri) == 0) {
+ char *key = NULL;
+ asprintf(&key, "%s:%i", ri->remote_addr, ri->remote_port);
+
+ celixThreadMutex_lock(&receiver->requestedConnections.mutex);
+ psa_websocket_requested_connection_entry_t *entry =
hashMap_get(receiver->requestedConnections.map, key);
+ if (entry != NULL) {
+ entry->connected = false;
+ entry->sockConnection = NULL;
+ if(entry->passive) {
+ hashMap_remove(receiver->requestedConnections.map, key);
+ free(entry->key);
+ free(entry->uri);
+ free(entry->socketAddress);
+ free(entry);
+ }
+ }
+ celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
+ free(key);
+ }
}
}
@@ -597,7 +679,7 @@ static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
hash_map_iterator_t iter =
hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_requested_connection_entry_t *entry =
hashMapIterator_nextValue(&iter);
- if (!entry->connected) {
+ if (!entry->connected && !entry->passive) {
char errBuf[100] = {0};
entry->sockConnection =
mg_connect_websocket_client(entry->socketAddress,
(int)
entry->socketPort,
@@ -608,7 +690,7 @@ static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topi
NULL,
psa_websocketTopicReceiver_data,
psa_websocketTopicReceiver_close,
- entry);
+ receiver);
if(entry->sockConnection != NULL) {
entry->connected = true;
entry->connectRetryCount = 0;
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
index be1cff7..4f100c4 100644
---
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
+++
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.h
@@ -36,12 +36,13 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
const char*
pubsub_websocketTopicReceiver_scope(pubsub_websocket_topic_receiver_t
*receiver);
const char*
pubsub_websocketTopicReceiver_topic(pubsub_websocket_topic_receiver_t
*receiver);
+const char*
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t *receiver);
long
pubsub_websocketTopicReceiver_serializerSvcId(pubsub_websocket_topic_receiver_t
*receiver);
void
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t
*unconnectedUrls);
-void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort, const char *uri);
-void
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t
*receiver, const char *uri);
+void pubsub_websocketTopicReceiver_connectTo(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort);
+void
pubsub_websocketTopicReceiver_disconnectFrom(pubsub_websocket_topic_receiver_t
*receiver, const char *socketAddress, long socketPort);
pubsub_admin_receiver_metrics_t*
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t
*receiver);