http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_discovery/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_discovery/CMakeLists.txt b/pubsub/pubsub_discovery/CMakeLists.txt index 0e7d6c5..f92f81c 100644 --- a/pubsub/pubsub_discovery/CMakeLists.txt +++ b/pubsub/pubsub_discovery/CMakeLists.txt @@ -18,7 +18,7 @@ find_package(CURL REQUIRED) find_package(Jansson REQUIRED) -add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery +add_bundle(celix_pubsub_discovery_etcd BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd" VERSION "1.0.0" SOURCES @@ -27,16 +27,15 @@ add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery src/etcd_common.c src/etcd_watcher.c src/etcd_writer.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c ) -target_include_directories(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE +target_include_directories(celix_pubsub_discovery_etcd PRIVATE src - include ${CURL_INCLUDE_DIR} ${JANSSON_INCLUDE_DIR} - ) +) + +target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES}) +install_bundle(celix_pubsub_discovery_etcd) -target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES}) -install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery) +add_library(Celix::pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_serializer_json/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt index b86f30e..1269cad 100644 --- a/pubsub/pubsub_serializer_json/CMakeLists.txt +++ b/pubsub/pubsub_serializer_json/CMakeLists.txt @@ -18,22 +18,23 @@ find_package(Jansson REQUIRED) -add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson +add_bundle(celix_pubsub_serializer_json BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json" VERSION "1.0.0" SOURCES src/ps_activator.c src/pubsub_serializer_impl.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c ) -target_include_directories(org.apache.celix.pubsub_serializer.PubSubSerializerJson PRIVATE +target_include_directories(celix_pubsub_serializer_json PRIVATE src ${JANSSON_INCLUDE_DIR} ) -set_target_properties(org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(org.apache.celix.pubsub_serializer.PubSubSerializerJson PRIVATE Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper) +set_target_properties(celix_pubsub_serializer_json PROPERTIES INSTALL_RPATH "$ORIGIN") +target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper) -install_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson) +install_bundle(celix_pubsub_serializer_json) + +add_library(Celix::pubsub_serializer_json ALIAS celix_pubsub_serializer_json) http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/CMakeLists.txt b/pubsub/pubsub_spi/CMakeLists.txt new file mode 100644 index 0000000..118dd3a --- /dev/null +++ b/pubsub/pubsub_spi/CMakeLists.txt @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_library(celix_pubsub_spi STATIC + src/pubsub_admin_match.c + src/pubsub_endpoint.c + src/pubsub_utils.c +) +target_include_directories(celix_pubsub_spi PUBLIC + $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include> + $<INSTALL_INTERFACE:include/celix/pubsub_spi> +) +target_link_libraries(celix_pubsub_spi PUBLIC Celix::framework Celix::pubsub_api) + +set_target_properties(celix_pubsub_spi PROPERTIES TOPIC_INFO_DESCRIPTOR ${CMAKE_CURRENT_LIST_DIR}/include/pubsub_topic_info.descriptor) +#TODO how to make this descriptor available for imported targets? $<INSTALL_INTERFACE:include/celix/pubsub_spi/pubsub_topic_info.descriptor> + +add_library(Celix::pubsub_spi ALIAS celix_pubsub_spi) + +install(TARGETS celix_pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub) +install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/publisher_endpoint_announce.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h new file mode 100644 index 0000000..bd39fc0 --- /dev/null +++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h @@ -0,0 +1,36 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_ +#define PUBLISHER_ENDPOINT_ANNOUNCE_H_ + +#include "pubsub_endpoint.h" + +struct publisher_endpoint_announce { + void *handle; + celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP); + celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP); + celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic); + celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic); +}; + +typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt; + + +#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h b/pubsub/pubsub_spi/include/pubsub_admin.h new file mode 100644 index 0000000..f24d825 --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_admin.h @@ -0,0 +1,72 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_admin.h + * + * \date Sep 30, 2011 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_ADMIN_H_ +#define PUBSUB_ADMIN_H_ + +#include "service_reference.h" + +#include "pubsub_common.h" +#include "pubsub_endpoint.h" + +#define PSA_IP "PSA_IP" +#define PSA_ITF "PSA_INTERFACE" +#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" + +#define PUBSUB_ADMIN_TYPE_KEY "pubsub_admin.type" + +typedef struct pubsub_admin *pubsub_admin_pt; + +struct pubsub_admin_service { + pubsub_admin_pt admin; + + celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + + celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + + celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); + celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); + + /* Match principle: + * - A full matching pubsub_admin gives 200 points + * - A full matching serializer gives 100 points + * - If QoS = sample + * - fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75. + * - fallback serializers order of selection is: json, void. Points allocation is 30,20. + * - If QoS = control + * - fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75. + * - fallback serializers order of selection is: json, void. Points allocation is 30,20. + * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two. + * + */ + celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); +}; + +typedef struct pubsub_admin_service *pubsub_admin_service_pt; + +#endif /* PUBSUB_ADMIN_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin_match.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h b/pubsub/pubsub_spi/include/pubsub_admin_match.h new file mode 100644 index 0000000..e95ca7d --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h @@ -0,0 +1,40 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + + +#ifndef PUBSUB_ADMIN_MATCH_H_ +#define PUBSUB_ADMIN_MATCH_H_ + +#include "celix_errno.h" +#include "properties.h" +#include "array_list.h" + +#include "pubsub_serializer.h" + +#define QOS_ATTRIBUTE_KEY "attribute.qos" +#define QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */ +#define QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */ + +#define PUBSUB_ADMIN_FULL_MATCH_SCORE 200.0F +#define SERIALIZER_FULL_MATCH_SCORE 100.0F + +celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score); +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc); + +#endif /* PUBSUB_ADMIN_MATCH_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_common.h b/pubsub/pubsub_spi/include/pubsub_common.h new file mode 100644 index 0000000..5dfd8fd --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_common.h @@ -0,0 +1,52 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_common.h + * + * \date Sep 17, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_COMMON_H_ +#define PUBSUB_COMMON_H_ + +#define PUBSUB_SERIALIZER_SERVICE "pubsub_serializer" +#define PUBSUB_ADMIN_SERVICE "pubsub_admin" +#define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery" +#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher" + +#define PUBSUB_ANY_SUB_TOPIC "any" + +#define PUBSUB_BUNDLE_ID "bundle.id" + +#define MAX_SCOPE_LEN 1024 +#define MAX_TOPIC_LEN 1024 + +struct pubsub_msg_header{ + char topic[MAX_TOPIC_LEN]; + unsigned int type; + unsigned char major; + unsigned char minor; +}; + +typedef struct pubsub_msg_header* pubsub_msg_header_pt; + + +#endif /* PUBSUB_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h b/pubsub/pubsub_spi/include/pubsub_endpoint.h new file mode 100644 index 0000000..4c39d2f --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h @@ -0,0 +1,58 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_endpoint.h + * + * \date Sep 21, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_ENDPOINT_H_ +#define PUBSUB_ENDPOINT_H_ + +#include "service_reference.h" +#include "listener_hook_service.h" +#include "properties.h" + +#include "pubsub/publisher.h" +#include "pubsub/subscriber.h" + +struct pubsub_endpoint { + char *frameworkUUID; + char *scope; + char *topic; + long serviceID; + char* endpoint; + bool is_secure; + properties_pt topic_props; +}; + +typedef struct pubsub_endpoint *pubsub_endpoint_pt; + +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); + +char *createScopeTopicKey(const char* scope, const char* topic); + +#endif /* PUBSUB_ENDPOINT_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_serializer.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h b/pubsub/pubsub_spi/include/pubsub_serializer.h new file mode 100644 index 0000000..4489fa4 --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_serializer.h @@ -0,0 +1,66 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_serializer.h + * + * \date Mar 24, 2017 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_SERIALIZER_SERVICE_H_ +#define PUBSUB_SERIALIZER_SERVICE_H_ + +#include "service_reference.h" +#include "hash_map.h" + +#include "pubsub_common.h" + +#define PUBSUB_SERIALIZER_TYPE_KEY "pubsub_serializer.type" + +/** + * There should be a pubsub_serializer_t + * per msg type (msg id) per bundle + * + * The pubsub_serializer_service can create + * a serializer_map per bundle. Potentially using + * the extender pattern. + */ + +typedef struct pubsub_msg_serializer { + void* handle; + unsigned int msgId; + const char* msgName; + version_pt msgVersion; + + celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen); + celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed + void (*freeMsg)(void* handle, void* msg); + +} pubsub_msg_serializer_t; + +typedef struct pubsub_serializer_service { + void* handle; + + celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap); + celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap); + +} pubsub_serializer_service_t; + +#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor new file mode 100644 index 0000000..c01a2fd --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor @@ -0,0 +1,10 @@ +:header +type=interface +name=pubsub_topic_info +version=1.0.0 +:annotations +:types +:methods +getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N +getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N +getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_utils.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h b/pubsub/pubsub_spi/include/pubsub_utils.h new file mode 100644 index 0000000..aff5c72 --- /dev/null +++ b/pubsub/pubsub_spi/include/pubsub_utils.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_utils.h + * + * \date Sep 24, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_UTILS_H_ +#define PUBSUB_UTILS_H_ + +#include "bundle_context.h" +#include "array_list.h" + +char* pubsub_getScopeFromFilter(char* bundle_filter); +char* pubsub_getTopicFromFilter(char* bundle_filter); +char* pubsub_getKeysBundleDir(bundle_context_pt ctx); +array_list_pt pubsub_getTopicsFromString(char* string); + + +#endif /* PUBSUB_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_admin_match.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c b/pubsub/pubsub_spi/src/pubsub_admin_match.c new file mode 100644 index 0000000..2a695c1 --- /dev/null +++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c @@ -0,0 +1,320 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + + +#include <string.h> +#include "service_reference.h" + +#include "pubsub_admin.h" + +#include "pubsub_admin_match.h" + +#define KNOWN_PUBSUB_ADMIN_NUM 2 +#define KNOWN_SERIALIZER_NUM 2 + +static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"}; +static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; + +static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"}; +static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"}; + +static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F}; +static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F}; + +static void get_serializer_type(service_reference_pt svcRef, char **serializerType); +static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService); + +celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){ + + celix_status_t status = CELIX_SUCCESS; + double final_score = 0; + int i = 0, j = 0; + + const char *requested_admin_type = NULL; + const char *requested_serializer_type = NULL; + const char *requested_qos_type = NULL; + + if(endpoint_props!=NULL){ + requested_admin_type = properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY); + requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); + requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); + } + + /* Analyze the pubsub_admin */ + if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */ + if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match + final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE; + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += qos_pubsub_admin_score[i]; + break; + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += qos_pubsub_admin_score[i]; + break; + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){ + if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){ + final_score += (qos_pubsub_admin_score[i]/2); + break; + } + } + } + + char *serializer_type = NULL; + /* Analyze the serializers */ + if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ + for(i=0;i<arrayList_size(serializerList);i++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ + final_score += SERIALIZER_FULL_MATCH_SCORE; + break; + } + } + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += qos_serializer_score[i]; + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += qos_serializer_score[i]; + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j); + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + ser_found = true; + } + } + } + if(ser_found){ + final_score += (qos_serializer_score[i]/2); + } + } + } + + *score = final_score; + + printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score); + + return status; +} + +celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){ + celix_status_t status = CELIX_SUCCESS; + + int i = 0, j = 0; + + const char *requested_serializer_type = NULL; + const char *requested_qos_type = NULL; + + if (endpoint_props != NULL){ + requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY); + requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY); + } + + service_reference_pt svcRef = NULL; + void *svc = NULL; + + /* Analyze the serializers */ + if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */ + for(i=0;i<arrayList_size(serializerList);i++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,i); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + *serSvc = svc; + break; + } + } + } + } + else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */ + if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE); + } + } + } + } + } + } + else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL); + } + } + } + } + } + } + else{ + printf("Unknown QoS type '%s'\n",requested_qos_type); + status = CELIX_ILLEGAL_ARGUMENT; + } + } + else{ /* We got no specification: fallback to Qos=Sample, but count half the score */ + bool ser_found = false; + for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){ + for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){ + svcRef = (service_reference_pt)arrayList_get(serializerList,j); + char *serializer_type = NULL; + get_serializer_type(svcRef, &serializer_type); + if(serializer_type != NULL){ + if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){ + manage_service_from_reference(svcRef, &svc,true); + if(svc==NULL){ + printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef); + status = CELIX_SERVICE_EXCEPTION; + } + else{ + *serSvc = svc; + ser_found = true; + printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]); + } + } + } + } + } + } + + if(svc!=NULL && svcRef!=NULL){ + manage_service_from_reference(svcRef, svc, false); + } + + return status; +} + +static void get_serializer_type(service_reference_pt svcRef, char **serializerType){ + + const char *serType = NULL; + serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType != NULL){ + *serializerType = (char*)serType; + } + else{ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef); + *serializerType = NULL; + } +} + +static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){ + bundle_context_pt context = NULL; + bundle_pt bundle = NULL; + serviceReference_getBundle(svcRef, &bundle); + bundle_getContext(bundle, &context); + if(getService){ + bundleContext_getService(context, svcRef, svc); + } + else{ + bundleContext_ungetService(context, svcRef, NULL); + } +} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c b/pubsub/pubsub_spi/src/pubsub_endpoint.c new file mode 100644 index 0000000..c3fd293 --- /dev/null +++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c @@ -0,0 +1,254 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * endpoint_description.c + * + * \date 25 Jul 2014 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include <string.h> +#include <stdlib.h> + +#include "celix_errno.h" +#include "celix_log.h" + +#include "pubsub_common.h" +#include "pubsub_endpoint.h" +#include "constants.h" + +#include "pubsub_utils.h" + + +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps); +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher); + +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){ + + if (fwUUID != NULL) { + psEp->frameworkUUID = strdup(fwUUID); + } + + if (scope != NULL) { + psEp->scope = strdup(scope); + } + + if (topic != NULL) { + psEp->topic = strdup(topic); + } + + psEp->serviceID = serviceId; + + if(endpoint != NULL) { + psEp->endpoint = strdup(endpoint); + } + + if(topic_props != NULL){ + if(cloneProps){ + properties_copy(topic_props, &(psEp->topic_props)); + } + else{ + psEp->topic_props = topic_props; + } + } +} + +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){ + + properties_pt topic_props = NULL; + + bool isSystemBundle = false; + bundle_isSystemBundle(bundle, &isSystemBundle); + long bundleId = -1; + bundle_isSystemBundle(bundle, &isSystemBundle); + bundle_getBundleId(bundle,&bundleId); + + if(isSystemBundle == false) { + + char *bundleRoot = NULL; + char* topicPropertiesPath = NULL; + bundle_getEntry(bundle, ".", &bundleRoot); + + if(bundleRoot != NULL){ + + asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic); + topic_props = properties_load(topicPropertiesPath); + if(topic_props==NULL){ + printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId); + } + + free(topicPropertiesPath); + free(bundleRoot); + } + } + + return topic_props; +} + +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){ + celix_status_t status = CELIX_SUCCESS; + + *psEp = calloc(1, sizeof(**psEp)); + + pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true); + + return status; + +} + +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){ + celix_status_t status = CELIX_SUCCESS; + + *out = calloc(1,sizeof(**out)); + + pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true); + + return status; + +} + +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + pubsub_endpoint_pt ep = calloc(1,sizeof(*ep)); + + bundle_pt bundle = NULL; + bundle_context_pt ctxt = NULL; + const char* fwUUID = NULL; + serviceReference_getBundle(reference,&bundle); + bundle_getContext(bundle,&ctxt); + bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + + const char* scope = NULL; + serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope); + + const char* topic = NULL; + serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic); + + const char* serviceId = NULL; + serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); + + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); + + pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false); + + if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) { + fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); + status = CELIX_BUNDLE_EXCEPTION; + pubsubEndpoint_destroy(ep); + *psEp = NULL; + } + else{ + *psEp = ep; + } + + return status; + +} + +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + const char* fwUUID=NULL; + bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + + if(fwUUID==NULL){ + return CELIX_BUNDLE_EXCEPTION; + } + + char* topic = pubsub_getTopicFromFilter(info->filter); + if(topic==NULL){ + return CELIX_BUNDLE_EXCEPTION; + } + + *psEp = calloc(1, sizeof(**psEp)); + + char* scope = pubsub_getScopeFromFilter(info->filter); + if(scope == NULL) { + scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); + } + + bundle_pt bundle = NULL; + long bundleId = -1; + bundleContext_getBundle(info->context,&bundle); + + bundle_getBundleId(bundle,&bundleId); + + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); + + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false); + + free(topic); + free(scope); + + + return status; +} + +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ + + if(psEp->frameworkUUID!=NULL){ + free(psEp->frameworkUUID); + psEp->frameworkUUID = NULL; + } + + if(psEp->scope!=NULL){ + free(psEp->scope); + psEp->scope = NULL; + } + + if(psEp->topic!=NULL){ + free(psEp->topic); + psEp->topic = NULL; + } + + if(psEp->endpoint!=NULL){ + free(psEp->endpoint); + psEp->endpoint = NULL; + } + + if(psEp->topic_props != NULL){ + properties_destroy(psEp->topic_props); + } + + free(psEp); + + return CELIX_SUCCESS; + +} + +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ + + return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) && + (strcmp(psEp1->scope,psEp2->scope)==0) && + (strcmp(psEp1->topic,psEp2->topic)==0) && + (psEp1->serviceID == psEp2->serviceID) /*&& + ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ + ); +} + +char *createScopeTopicKey(const char* scope, const char* topic) { + char *result = NULL; + asprintf(&result, "%s:%s", scope, topic); + + return result; +} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_utils.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c b/pubsub/pubsub_spi/src/pubsub_utils.c new file mode 100644 index 0000000..19b2271 --- /dev/null +++ b/pubsub/pubsub_spi/src/pubsub_utils.c @@ -0,0 +1,170 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_utils.c + * + * \date Sep 24, 2015 + * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include <string.h> +#include <stdlib.h> + +#include "constants.h" + +#include "pubsub_common.h" +#include "pubsub/publisher.h" +#include "pubsub_utils.h" + +#include "array_list.h" +#include "bundle.h" + +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> + +#define MAX_KEYBUNDLE_LENGTH 256 + +char* pubsub_getScopeFromFilter(char* bundle_filter){ + + char* scope = NULL; + + char* filter = strdup(bundle_filter); + + char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); + if(oc!=NULL){ + oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; + if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ + + char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE); + if(scopes!=NULL){ + + scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1; + char* bottom=strchr(scopes,')'); + *bottom='\0'; + + scope=strdup(scopes); + } else { + scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); + } + } + } + + free(filter); + + return scope; +} + +char* pubsub_getTopicFromFilter(char* bundle_filter){ + + char* topic = NULL; + + char* filter = strdup(bundle_filter); + + char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS); + if(oc!=NULL){ + oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1; + if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){ + + char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC); + if(topics!=NULL){ + + topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1; + char* bottom=strchr(topics,')'); + *bottom='\0'; + + topic=strdup(topics); + + } + } + } + + free(filter); + + return topic; + +} + +array_list_pt pubsub_getTopicsFromString(char* string){ + + array_list_pt topic_list = NULL; + arrayList_create(&topic_list); + + char* topics = strdup(string); + + char* topic = strtok(topics,",;|# "); + arrayList_add(topic_list,strdup(topic)); + + while( (topic = strtok(NULL,",;|# ")) !=NULL){ + arrayList_add(topic_list,strdup(topic)); + } + + free(topics); + + return topic_list; + +} + +/** + * Loop through all bundles and look for the bundle with the keys inside. + * If no key bundle found, return NULL + * + * Caller is responsible for freeing the object + */ +char* pubsub_getKeysBundleDir(bundle_context_pt ctx) +{ + array_list_pt bundles = NULL; + bundleContext_getBundles(ctx, &bundles); + int nrOfBundles = arrayList_size(bundles); + long bundle_id = -1; + char* result = NULL; + + for (int i = 0; i < nrOfBundles; i++){ + bundle_pt b = arrayList_get(bundles, i); + + /* Skip bundle 0 (framework bundle) since it has no path nor revisions */ + bundle_getBundleId(b, &bundle_id); + if(bundle_id==0){ + continue; + } + + char* dir = NULL; + bundle_getEntry(b, ".", &dir); + + char cert_dir[MAX_KEYBUNDLE_LENGTH]; + snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir); + + struct stat s; + int err = stat(cert_dir, &s); + if (err != -1){ + if (S_ISDIR(s.st_mode)){ + result = dir; + break; + } + } + + free(dir); + } + + arrayList_destroy(bundles); + + return result; +} + http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt index 784ca21..73b9ecb 100644 --- a/pubsub/pubsub_topology_manager/CMakeLists.txt +++ b/pubsub/pubsub_topology_manager/CMakeLists.txt @@ -15,22 +15,23 @@ # specific language governing permissions and limitations # under the License. -add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager +add_bundle(celix_pubsub_topology_manager BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager" VERSION "1.0.0" SOURCES - private/src/pstm_activator.c - private/src/pubsub_topology_manager.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c + src/pstm_activator.c + src/pubsub_topology_manager.c + src/pubsub_topology_manager.h ) +target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi) -bundle_files(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor +get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR) +bundle_files(celix_pubsub_topology_manager + ${DESC} DESTINATION "META-INF/descriptors/services" ) -target_link_libraries(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager PRIVATE Celix::framework Celix::log_helper) +install_bundle(celix_pubsub_topology_manager) -install_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager) +add_library(Celix::pubsub_topology_manager ALIAS celix_pubsub_topology_manager) http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h deleted file mode 100644 index 7614e0c..0000000 --- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h +++ /dev/null @@ -1,83 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_topology_manager.h - * - * \date Sep 29, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_TOPOLOGY_MANAGER_H_ -#define PUBSUB_TOPOLOGY_MANAGER_H_ - -#include "endpoint_listener.h" -#include "service_reference.h" -#include "bundle_context.h" -#include "log_helper.h" - -#include "pubsub_common.h" -#include "pubsub_endpoint.h" -#include "publisher.h" -#include "subscriber.h" - - -struct pubsub_topology_manager { - bundle_context_pt context; - - celix_thread_mutex_t psaListLock; - array_list_pt psaList; - - celix_thread_mutex_t discoveryListLock; - hash_map_pt discoveryList; //<serviceReference,NULL> - - celix_thread_mutex_t publicationsLock; - hash_map_pt publications; //<topic(string),list<pubsub_ep>> - - celix_thread_mutex_t subscriptionsLock; - hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>> - - log_helper_pt loghelper; -}; - -typedef struct pubsub_topology_manager *pubsub_topology_manager_pt; - -celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager); -celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager); -celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager); - -celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service); -celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service); -celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service); - -celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service); -celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service); -celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service); - -celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service); - -celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners); -celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners); - -celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP); -celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP); - -#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pstm_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c deleted file mode 100644 index 4d6dd27..0000000 --- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c +++ /dev/null @@ -1,246 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pstm_activator.c - * - * \date Sep 29, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include "constants.h" -#include "bundle_activator.h" -#include "service_tracker.h" -#include "service_registration.h" - -#include "endpoint_listener.h" -#include "remote_constants.h" -#include "listener_hook_service.h" -#include "log_service.h" -#include "log_helper.h" - - -#include "pubsub_topology_manager.h" -#include "publisher_endpoint_announce.h" - -struct activator { - bundle_context_pt context; - - pubsub_topology_manager_pt manager; - - service_tracker_pt pubsubDiscoveryTracker; - service_tracker_pt pubsubAdminTracker; - service_tracker_pt pubsubSubscribersTracker; - - listener_hook_service_pt hookService; - service_registration_pt hook; - - publisher_endpoint_announce_pt publisherEPDiscover; - service_registration_pt publisherEPDiscoverService; - - log_helper_pt loghelper; -}; - - -static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker); -static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker); -static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker); - - -static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) { - celix_status_t status; - - service_tracker_customizer_pt customizer = NULL; - - status = serviceTrackerCustomizer_create(activator->manager, - NULL, - pubsub_topologyManager_pubsubDiscoveryAdded, - pubsub_topologyManager_pubsubDiscoveryModified, - pubsub_topologyManager_pubsubDiscoveryRemoved, - &customizer); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_create(activator->context, (char *) PUBSUB_DISCOVERY_SERVICE, customizer, tracker); - } - - return status; -} - -static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker) { - celix_status_t status = CELIX_SUCCESS; - - service_tracker_customizer_pt customizer = NULL; - - status = serviceTrackerCustomizer_create(activator->manager, - NULL, - pubsub_topologyManager_psaAdded, - pubsub_topologyManager_psaModified, - pubsub_topologyManager_psaRemoved, - &customizer); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_create(activator->context, PUBSUB_ADMIN_SERVICE, customizer, tracker); - } - - return status; -} - -static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker) { - celix_status_t status = CELIX_SUCCESS; - - service_tracker_customizer_pt customizer = NULL; - - status = serviceTrackerCustomizer_create(activator->manager, - NULL, - pubsub_topologyManager_subscriberAdded, - pubsub_topologyManager_subscriberModified, - pubsub_topologyManager_subscriberRemoved, - &customizer); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_create(activator->context, PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker); - } - - return status; -} - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = NULL; - - activator = calloc(1,sizeof(struct activator)); - - if (!activator) { - return CELIX_ENOMEM; - } - - activator->context = context; - - logHelper_create(context, &activator->loghelper); - logHelper_start(activator->loghelper); - - status = pubsub_topologyManager_create(context, activator->loghelper, &activator->manager); - if (status == CELIX_SUCCESS) { - status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker); - if (status == CELIX_SUCCESS) { - status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker); - if (status == CELIX_SUCCESS) { - status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker); - if (status == CELIX_SUCCESS) { - *userData = activator; - } - } - } - } - - if(status != CELIX_SUCCESS){ - bundleActivator_destroy(activator, context); - } - - return status; -} - - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - publisher_endpoint_announce_pt pubEPDiscover = calloc(1, sizeof(*pubEPDiscover)); - pubEPDiscover->handle = activator->manager; - pubEPDiscover->announcePublisher = pubsub_topologyManager_announcePublisher; - pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher; - activator->publisherEPDiscover = pubEPDiscover; - - status += bundleContext_registerService(context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, &activator->publisherEPDiscoverService); - - - listener_hook_service_pt hookService = calloc(1,sizeof(*hookService)); - hookService->handle = activator->manager; - hookService->added = pubsub_topologyManager_publisherTrackerAdded; - hookService->removed = pubsub_topologyManager_publisherTrackerRemoved; - activator->hookService = hookService; - - status += bundleContext_registerService(context, (char *) OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook); - - /* NOTE: Enable those line in order to remotely expose the topic_info service - properties_pt props = properties_create(); - properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE); - status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService); - */ - status += serviceTracker_open(activator->pubsubAdminTracker); - - status += serviceTracker_open(activator->pubsubDiscoveryTracker); - - status += serviceTracker_open(activator->pubsubSubscribersTracker); - - - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceTracker_close(activator->pubsubSubscribersTracker); - serviceTracker_close(activator->pubsubDiscoveryTracker); - serviceTracker_close(activator->pubsubAdminTracker); - - serviceRegistration_unregister(activator->publisherEPDiscoverService); - free(activator->publisherEPDiscover); - - serviceRegistration_unregister(activator->hook); - free(activator->hookService); - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - - struct activator *activator = userData; - if (activator == NULL) { - status = CELIX_BUNDLE_EXCEPTION; - } else { - - if(activator->pubsubSubscribersTracker!=NULL){ - serviceTracker_destroy(activator->pubsubSubscribersTracker); - } - if(activator->pubsubDiscoveryTracker!=NULL){ - serviceTracker_destroy(activator->pubsubDiscoveryTracker); - } - if(activator->pubsubAdminTracker!=NULL){ - serviceTracker_destroy(activator->pubsubAdminTracker); - } - - if(activator->manager!=NULL){ - status = pubsub_topologyManager_destroy(activator->manager); - } - - logHelper_stop(activator->loghelper); - logHelper_destroy(&activator->loghelper); - - free(activator); - } - - return status; -} http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c deleted file mode 100644 index 987d864..0000000 --- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c +++ /dev/null @@ -1,723 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_topology_manager.c - * - * \date Sep 29, 2011 - * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <stdbool.h> - -#include "hash_map.h" -#include "array_list.h" -#include "bundle_context.h" -#include "constants.h" -#include "module.h" -#include "bundle.h" -#include "remote_service_admin.h" -#include "remote_constants.h" -#include "filter.h" -#include "listener_hook_service.h" -#include "utils.h" -#include "service_reference.h" -#include "service_registration.h" -#include "log_service.h" -#include "log_helper.h" - -#include "publisher_endpoint_announce.h" -#include "pubsub_topology_manager.h" -#include "pubsub_endpoint.h" -#include "pubsub_admin.h" -#include "pubsub_utils.h" - - -celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) { - celix_status_t status = CELIX_SUCCESS; - - *manager = calloc(1, sizeof(**manager)); - if (!*manager) { - return CELIX_ENOMEM; - } - - (*manager)->context = context; - - celix_thread_mutexattr_t psaAttr; - celixThreadMutexAttr_create(&psaAttr); - celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); - status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); - celixThreadMutexAttr_destroy(&psaAttr); - - status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL); - status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); - status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); - - arrayList_create(&(*manager)->psaList); - - (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); - (*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - - (*manager)->loghelper = logHelper; - - return status; -} - -celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&manager->discoveryListLock); - hashMap_destroy(manager->discoveryList, false, false); - celixThreadMutex_unlock(&manager->discoveryListLock); - celixThreadMutex_destroy(&manager->discoveryListLock); - - celixThreadMutex_lock(&manager->psaListLock); - arrayList_destroy(manager->psaList); - celixThreadMutex_unlock(&manager->psaListLock); - celixThreadMutex_destroy(&manager->psaListLock); - - celixThreadMutex_lock(&manager->publicationsLock); - hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(pubit)){ - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit); - int i; - for(i=0;i<arrayList_size(l);i++){ - pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); - } - arrayList_destroy(l); - } - hashMapIterator_destroy(pubit); - hashMap_destroy(manager->publications, true, false); - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_destroy(&manager->publicationsLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); - while(hashMapIterator_hasNext(subit)){ - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit); - int i; - for(i=0;i<arrayList_size(l);i++){ - pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); - } - arrayList_destroy(l); - } - hashMapIterator_destroy(subit); - hashMap_destroy(manager->subscriptions, true, false); - celixThreadMutex_unlock(&manager->subscriptionsLock); - celixThreadMutex_destroy(&manager->subscriptionsLock); - - free(manager); - - return status; -} - -celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - int i; - - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA"); - - celixThreadMutex_lock(&manager->psaListLock); - arrayList_add(manager->psaList, psa); - celixThreadMutex_unlock(&manager->psaListLock); - - // Add already detected subscriptions to new PSA - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions); - - while (hashMapIterator_hasNext(subscriptionsIterator)) { - array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator); - for(i=0;i<arrayList_size(sub_ep_list);i++){ - status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i)); - } - } - - hashMapIterator_destroy(subscriptionsIterator); - - celixThreadMutex_unlock(&manager->subscriptionsLock); - - // Add already detected publications to new PSA - status = celixThreadMutex_lock(&manager->publicationsLock); - hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications); - - while (hashMapIterator_hasNext(publicationsIterator)) { - array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator); - for(i=0;i<arrayList_size(pub_ep_list);i++){ - status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i)); - } - } - - hashMapIterator_destroy(publicationsIterator); - - celixThreadMutex_unlock(&manager->publicationsLock); - - return status; -} - -celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - // Nop... - - return status; -} - -celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; - - /* Deactivate all publications */ - celixThreadMutex_lock(&manager->publicationsLock); - - hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(pubit)){ - hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit); - char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry); - // Extract scope/topic name from key - char scope[MAX_SCOPE_LEN]; - char topic[MAX_TOPIC_LEN]; - sscanf(scope_topic_key, "%[^:]:%s", scope, topic ); - array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry); - - status = psa->closeAllPublications(psa->admin,scope,topic); - - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - const char* fwUUID = NULL; - bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - int i; - for(i=0;i<arrayList_size(pubEP_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ - disc->removePublisher(disc->handle,pubEP); - } - } - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } - } - hashMapIterator_destroy(pubit); - - celixThreadMutex_unlock(&manager->publicationsLock); - - /* Deactivate all subscriptions */ - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); - while(hashMapIterator_hasNext(subit)){ - // TODO do some error checking - char* scope_topic = (char*)hashMapIterator_nextKey(subit); - char scope[MAX_TOPIC_LEN]; - char topic[MAX_TOPIC_LEN]; - memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char)); - memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char)); - sscanf(scope_topic, "%[^:]:%s", scope, topic ); - status += psa->closeAllSubscriptions(psa->admin,scope, topic); - } - hashMapIterator_destroy(subit); - celixThreadMutex_unlock(&manager->subscriptionsLock); - - celixThreadMutex_lock(&manager->psaListLock); - arrayList_removeElement(manager->psaList, psa); - celixThreadMutex_unlock(&manager->psaListLock); - - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA"); - - return status; -} - -celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - //subscriber_service_pt subscriber = (subscriber_service_pt)service; - - pubsub_endpoint_pt sub = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->subscriptionsLock); - char *sub_key = createScopeTopicKey(sub->scope, sub->topic); - - array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); - if(sub_list_by_topic==NULL){ - arrayList_create(&sub_list_by_topic); - hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic); - } - free(sub_key); - arrayList_add(sub_list_by_topic,sub); - - celixThreadMutex_unlock(&manager->subscriptionsLock); - - int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - celixThreadMutex_lock(&manager->psaListLock); - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,sub,&score); - if(score>best_score){ /* We have a new winner! */ - best_score = score; - best_psa = psa; - } - } - - if(best_psa != NULL && best_score>0){ - best_psa->addSubscription(best_psa->admin,sub); - } - - // Inform discoveries for interest in the topic - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->interestedInTopic(disc->handle, sub->scope, sub->topic); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - - celixThreadMutex_unlock(&manager->psaListLock); - } - else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; - } - - return status; -} - -celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - // Nop... - - return status; -} - -celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - pubsub_endpoint_pt subcmp = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){ - - int j,k; - - // Inform discoveries that we not interested in the topic any more - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - celixThreadMutex_lock(&manager->psaListLock); - - char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic); - array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); - free(sub_key); - if(sub_list_by_topic!=NULL){ - for(j=0;j<arrayList_size(sub_list_by_topic);j++){ - pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j); - if(pubsubEndpoint_equals(sub,subcmp)){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->removeSubscription(psa->admin,sub); - } - - } - arrayList_remove(sub_list_by_topic,j); - - /* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */ - if(arrayList_size(sub_list_by_topic)==0){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic); - } - } - - pubsubEndpoint_destroy(sub); - - } - } - - celixThreadMutex_unlock(&manager->psaListLock); - celixThreadMutex_unlock(&manager->subscriptionsLock); - - pubsubEndpoint_destroy(subcmp); - - } - else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; - } - - return status; - -} - -celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle; - publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service; - - const char* fwUUID = NULL; - - bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSD: ERRROR: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - celixThreadMutex_lock(&manager->publicationsLock); - - celixThreadMutex_lock(&manager->discoveryListLock); - hashMap_put(manager->discoveryList, reference, NULL); - celixThreadMutex_unlock(&manager->discoveryListLock); - - hash_map_iterator_pt iter = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(iter)){ - array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); - for(int i = 0; i < arrayList_size(pubEP_list); i++) { - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint!=NULL)){ - status += disc->announcePublisher(disc->handle,pubEP); - } - } - } - hashMapIterator_destroy(iter); - - celixThreadMutex_unlock(&manager->publicationsLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - iter = hashMapIterator_create(manager->subscriptions); - - while(hashMapIterator_hasNext(iter)) { - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); - int i; - for(i=0;i<arrayList_size(l);i++){ - pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); - - disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic); - } - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->subscriptionsLock); - - return status; -} - -celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service); - if (status == CELIX_SUCCESS) { - status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service); - } - - return status; -} - -celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_topology_manager_pt manager = handle; - - celixThreadMutex_lock(&manager->discoveryListLock); - - - if (hashMap_remove(manager->discoveryList, reference)) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed"); - } - - celixThreadMutex_unlock(&manager->discoveryListLock); - - return status; -} - - -celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) { - - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - int l_index; - - for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { - - listener_hook_info_pt info = arrayList_get(listeners, l_index); - - pubsub_endpoint_pt pub = NULL; - if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){ - - celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(pub->scope, pub->topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); - if(pub_list_by_topic==NULL){ - arrayList_create(&pub_list_by_topic); - hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); - } - free(pub_key); - arrayList_add(pub_list_by_topic,pub); - - celixThreadMutex_unlock(&manager->publicationsLock); - - int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - celixThreadMutex_lock(&manager->psaListLock); - - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,pub,&score); - if(score>best_score){ /* We have a new winner! */ - best_score = score; - best_psa = psa; - } - } - - if(best_psa != NULL && best_score>0){ - status = best_psa->addPublication(best_psa->admin,pub); - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->announcePublisher(disc->handle,pub); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } - } - - celixThreadMutex_unlock(&manager->psaListLock); - - } - - } - - return status; - -} - - -celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - int l_index; - - for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { - - listener_hook_info_pt info = arrayList_get(listeners, l_index); - - pubsub_endpoint_pt pubcmp = NULL; - if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){ - - - int j,k; - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - - char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic!=NULL){ - for(j=0;j<arrayList_size(pub_list_by_topic);j++){ - pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j); - if(pubsubEndpoint_equals(pub,pubcmp)){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - status = psa->removePublication(psa->admin,pub); - if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->removePublisher(disc->handle,pub); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } - else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */ - status = CELIX_SUCCESS; - } - } - //} - arrayList_remove(pub_list_by_topic,j); - - /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */ - if(arrayList_size(pub_list_by_topic)==0){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllPublications(psa->admin,pub->scope, pub->topic); - } - } - - pubsubEndpoint_destroy(pub); - } - - } - } - - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); - - free(pub_key); - - pubsubEndpoint_destroy(pubcmp); - - } - - } - - return status; -} - -celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint); - - pubsub_topology_manager_pt manager = handle; - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - - char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic); - - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic==NULL){ - arrayList_create(&pub_list_by_topic); - hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); - } - free(pub_key); - - /* Shouldn't be any other duplicate, since it's filtered out by the discovery */ - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_clone(pubEP, &p); - arrayList_add(pub_list_by_topic,p); - - int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,p,&score); - if(score>best_score){ /* We have a new winner! */ - best_score = score; - best_psa = psa; - } - } - - if(best_psa != NULL && best_score>0){ - best_psa->addPublication(best_psa->admin,p); - } - else{ - status = CELIX_ILLEGAL_STATE; - } - - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); - - return status; -} - -celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint); - - pubsub_topology_manager_pt manager = handle; - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - int i; - - char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic==NULL){ - printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint); - status = CELIX_ILLEGAL_STATE; - } - else{ - - pubsub_endpoint_pt p = NULL; - bool found = false; - - for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){ - p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i); - found = pubsubEndpoint_equals(p,pubEP); - } - - if(found && p !=NULL){ - - for(i=0;i<arrayList_size(manager->psaList);i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ - psa->removePublication(psa->admin,p); - } - - arrayList_removeElement(pub_list_by_topic,p); - - /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */ - if(arrayList_size(pub_list_by_topic)==0){ - - for(i=0;i<arrayList_size(manager->psaList);i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - psa->closeAllPublications(psa->admin,p->scope, p->topic); - } - } - - pubsubEndpoint_destroy(p); - } - - - } - free(pub_key); - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); - - - return status; -} -
