http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
new file mode 100644
index 0000000..0e7a794
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -0,0 +1,732 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.c
+ *
+ *  \date       Oct 2, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include "topic_subscription.h"
+#include <czmq.h>
+/* The following undefs prevent the collision between:
+ * - sys/syslog.h (which is included within czmq)
+ * - celix/dfi/dfi_log_util.h
+ */
+#undef LOG_DEBUG
+#undef LOG_WARNING
+#undef LOG_INFO
+#undef LOG_WARNING
+
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+
+#include "utils.h"
+#include "celix_errno.h"
+#include "constants.h"
+#include "version.h"
+
+#include "subscriber.h"
+#include "publisher.h"
+#include "pubsub_utils.h"
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+#include "zmq_crypto.h"
+
+#define MAX_CERT_PATH_LENGTH 512
+#endif
+
+#define POLL_TIMEOUT   250
+#define ZMQ_POLL_TIMEOUT_MS_ENV        "ZMQ_POLL_TIMEOUT_MS"
+
+struct topic_subscription{
+
+       zsock_t* zmq_socket;
+       zcert_t * zmq_cert;
+       zcert_t * zmq_pub_cert;
+       pthread_mutex_t socket_lock;
+       service_tracker_pt tracker;
+       array_list_pt sub_ep_list;
+       celix_thread_t recv_thread;
+       bool running;
+       celix_thread_mutex_t ts_lock;
+       bundle_context_pt context;
+
+       pubsub_serializer_service_t *serializer;
+
+       hash_map_pt servicesMap; // key = service, value = msg types map
+
+       celix_thread_mutex_t pendingConnections_lock;
+       array_list_pt pendingConnections;
+
+       array_list_pt pendingDisconnections;
+       celix_thread_mutex_t pendingDisconnections_lock;
+
+       unsigned int nrSubscribers;
+};
+
+typedef struct complete_zmq_msg{
+       zframe_t* header;
+       zframe_t* payload;
+}* complete_zmq_msg_pt;
+
+typedef struct mp_handle{
+       hash_map_pt svc_msg_db;
+       hash_map_pt rcv_msg_map;
+}* mp_handle_pt;
+
+typedef struct msg_map_entry{
+       bool retain;
+       void* msgInst;
+}* msg_map_entry_pt;
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service);
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service);
+static void* zmq_recv_thread_func(void* arg);
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static void sigusr1_sighandler(int signo);
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part);
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list);
+static void destroy_mp_handle(mp_handle_pt mp_handle);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out){
+       celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+       if (keys_bundle_dir == NULL){
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       const char* keys_file_path = NULL;
+       const char* keys_file_name = NULL;
+       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, 
&keys_file_path);
+       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, 
&keys_file_name);
+
+       char sub_cert_path[MAX_CERT_PATH_LENGTH];
+       char pub_cert_path[MAX_CERT_PATH_LENGTH];
+
+       //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
+       snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
+       snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
+       free(keys_bundle_dir);
+
+       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", 
sub_cert_path);
+       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", 
pub_cert_path);
+
+       zcert_t* sub_cert = get_zcert_from_encoded_file((char *) 
keys_file_path, (char *) keys_file_name, sub_cert_path);
+       if (sub_cert == NULL){
+               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
sub_cert_path);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       zcert_t* pub_cert = zcert_load(pub_cert_path);
+       if (pub_cert == NULL){
+               zcert_destroy(&sub_cert);
+               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
pub_cert_path);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       const char* pub_key = zcert_public_txt(pub_cert);
+#endif
+
+       zsock_t* zmq_s = zsock_new (ZMQ_SUB);
+       if(zmq_s==NULL){
+#ifdef BUILD_WITH_ZMQ_SECURITY
+               zcert_destroy(&sub_cert);
+               zcert_destroy(&pub_cert);
+#endif
+
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       zcert_apply (sub_cert, zmq_s);
+       zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to 
socket of subscriber
+#endif
+
+       if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
+               zsock_set_subscribe (zmq_s, "");
+       }
+       else{
+               zsock_set_subscribe (zmq_s, topic);
+       }
+
+       topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
+       ts->context = bundle_context;
+       ts->zmq_socket = zmq_s;
+       ts->running = false;
+       ts->nrSubscribers = 0;
+       ts->serializer = best_serializer;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       ts->zmq_cert = sub_cert;
+       ts->zmq_pub_cert = pub_cert;
+#endif
+
+       celixThreadMutex_create(&ts->socket_lock, NULL);
+       celixThreadMutex_create(&ts->ts_lock,NULL);
+       arrayList_create(&ts->sub_ep_list);
+       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+
+       arrayList_create(&ts->pendingConnections);
+       arrayList_create(&ts->pendingDisconnections);
+       celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+       celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+
+       char filter[128];
+       memset(filter,0,128);
+       
if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT))
 == 0) {
+               // default scope, means that subscriber has not defined a scope 
property
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+       } else {
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic,
+                               PUBSUB_SUBSCRIBER_SCOPE,scope);
+       }
+       service_tracker_customizer_pt customizer = NULL;
+       status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
+       status += serviceTracker_createWithFilter(bundle_context, filter, 
customizer, &ts->tracker);
+
+       struct sigaction actions;
+       memset(&actions, 0, sizeof(actions));
+       sigemptyset(&actions.sa_mask);
+       actions.sa_flags = 0;
+       actions.sa_handler = sigusr1_sighandler;
+
+       sigaction(SIGUSR1,&actions,NULL);
+
+       *out=ts;
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+
+       ts->running = false;
+       serviceTracker_destroy(ts->tracker);
+       arrayList_clear(ts->sub_ep_list);
+       arrayList_destroy(ts->sub_ep_list);
+       /* TODO: Destroy all the serializer maps? */
+       hashMap_destroy(ts->servicesMap,false,false);
+
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_destroy(ts->pendingConnections);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_destroy(ts->pendingDisconnections);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       celixThreadMutex_lock(&ts->socket_lock);
+       zsock_destroy(&(ts->zmq_socket));
+#ifdef BUILD_WITH_ZMQ_SECURITY
+       zcert_destroy(&(ts->zmq_cert));
+       zcert_destroy(&(ts->zmq_pub_cert));
+#endif
+       celixThreadMutex_unlock(&ts->socket_lock);
+       celixThreadMutex_destroy(&ts->socket_lock);
+
+       celixThreadMutex_destroy(&ts->ts_lock);
+
+       free(ts);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       status = serviceTracker_open(ts->tracker);
+
+       ts->running = true;
+
+       if(status==CELIX_SUCCESS){
+               
status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       ts->running = false;
+
+       pthread_kill(ts->recv_thread.thread,SIGUSR1);
+
+       celixThread_join(ts->recv_thread,NULL);
+
+       status = serviceTracker_close(ts->tracker);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL){
+       celix_status_t status = CELIX_SUCCESS;
+       celixThreadMutex_lock(&ts->socket_lock);
+       if(!zsock_is(ts->zmq_socket) || 
zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){
+               status = CELIX_SERVICE_EXCEPTION;
+       }
+       celixThreadMutex_unlock(&ts->socket_lock);
+
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_add(ts->pendingConnections, url);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_add(ts->pendingDisconnections, url);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->socket_lock);
+       if(!zsock_is(ts->zmq_socket) || 
zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){
+               status = CELIX_SERVICE_EXCEPTION;
+       }
+       celixThreadMutex_unlock(&ts->socket_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_add(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+
+}
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers++;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_removeElement(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers--;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
+       return ts->nrSubscribers;
+}
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
+       return sub->sub_ep_list;
+}
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (!hashMap_containsKey(ts->servicesMap, service)) {
+               bundle_pt bundle = NULL;
+               hash_map_pt msgTypes = NULL;
+
+               serviceReference_getBundle(reference, &bundle);
+
+               if(ts->serializer != NULL && bundle!=NULL){
+                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+                       if(msgTypes != NULL){
+                               hashMap_put(ts->servicesMap, service, msgTypes);
+                               printf("PSA_ZMQ_TS: New subscriber 
registered.\n");
+                       }
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot register new subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (hashMap_containsKey(ts->servicesMap, service)) {
+               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+               if(msgTypes!=NULL && ts->serializer!=NULL){
+                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+
+static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+
+       pubsub_msg_header_pt first_msg_hdr = 
(pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
+
+       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+       while (hashMapIterator_hasNext(iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
+               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+
+               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type);
+               if (msgSer == NULL) {
+                       printf("PSA_ZMQ_TS: Primary message %d not supported. 
NOT sending any part of the whole message.\n",first_msg_hdr->type);
+               }
+               else{
+                       void *msgInst = NULL;
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,first_msg_hdr);
+
+                       if(validVersion){
+
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) 
zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, 
&msgInst);
+
+                               if (status == CELIX_SUCCESS) {
+                                       bool release = true;
+                                       mp_handle_pt mp_handle = 
create_mp_handle(msgTypes,msg_list);
+                                       pubsub_multipart_callbacks_t 
mp_callbacks;
+                                       mp_callbacks.handle = mp_handle;
+                                       mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
+                                       mp_callbacks.getMultipart = 
pubsub_getMultipart;
+                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+
+                                       if(release){
+                                               
msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst);
+                                       }
+                                       if(mp_handle!=NULL){
+                                               destroy_mp_handle(mp_handle);
+                                       }
+                               }
+                               else{
+                                       printf("PSA_ZMQ_TS: Cannot deserialize 
msgType %s.\n",msgSer->msgName);
+                               }
+
+                       }
+                       else{
+                               int major=0,minor=0;
+                               version_getMajor(msgSer->msgVersion,&major);
+                               version_getMinor(msgSer->msgVersion,&minor);
+                               printf("PSA_ZMQ_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
+                                               
msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+                       }
+
+               }
+       }
+       hashMapIterator_destroy(iter);
+
+       int i = 0;
+       for(;i<arrayList_size(msg_list);i++){
+               complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i);
+               zframe_destroy(&(c_msg->header));
+               zframe_destroy(&(c_msg->payload));
+               free(c_msg);
+       }
+
+       arrayList_destroy(msg_list);
+
+}
+
+static void* zmq_recv_thread_func(void * arg) {
+       topic_subscription_pt sub = (topic_subscription_pt) arg;
+
+       while (sub->running) {
+
+               celixThreadMutex_lock(&sub->socket_lock);
+
+               zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
+               if (headerMsg == NULL) {
+                       if (errno == EINTR) {
+                               //It means we got a signal and we have to 
exit...
+                               printf("PSA_ZMQ_TS: header_recv thread for 
topic got a signal and will exit.\n");
+                       } else {
+                               perror("PSA_ZMQ_TS: header_recv thread");
+                       }
+               }
+               else {
+
+                       pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) 
zframe_data(headerMsg);
+
+                       if (zframe_more(headerMsg)) {
+
+                               zframe_t* payloadMsg = 
zframe_recv(sub->zmq_socket);
+                               if (payloadMsg == NULL) {
+                                       if (errno == EINTR) {
+                                               //It means we got a signal and 
we have to exit...
+                                               printf("PSA_ZMQ_TS: 
payload_recv thread for topic got a signal and will exit.\n");
+                                       } else {
+                                               perror("PSA_ZMQ_TS: 
payload_recv");
+                                       }
+                                       zframe_destroy(&headerMsg);
+                               } else {
+
+                                       //Let's fetch all the messages from the 
socket
+                                       array_list_pt msg_list = NULL;
+                                       arrayList_create(&msg_list);
+                                       complete_zmq_msg_pt firstMsg = 
calloc(1, sizeof(struct complete_zmq_msg));
+                                       firstMsg->header = headerMsg;
+                                       firstMsg->payload = payloadMsg;
+                                       arrayList_add(msg_list, firstMsg);
+
+                                       bool more = zframe_more(payloadMsg);
+                                       while (more) {
+
+                                               zframe_t* h_msg = 
zframe_recv(sub->zmq_socket);
+                                               if (h_msg == NULL) {
+                                                       if (errno == EINTR) {
+                                                               //It means we 
got a signal and we have to exit...
+                                                               
printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n");
+                                                       } else {
+                                                               
perror("PSA_ZMQ_TS: h_recv");
+                                                       }
+                                                       break;
+                                               }
+
+                                               zframe_t* p_msg = 
zframe_recv(sub->zmq_socket);
+                                               if (p_msg == NULL) {
+                                                       if (errno == EINTR) {
+                                                               //It means we 
got a signal and we have to exit...
+                                                               
printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n");
+                                                       } else {
+                                                               
perror("PSA_ZMQ_TS: p_recv");
+                                                       }
+                                                       zframe_destroy(&h_msg);
+                                                       break;
+                                               }
+
+                                               complete_zmq_msg_pt c_msg = 
calloc(1, sizeof(struct complete_zmq_msg));
+                                               c_msg->header = h_msg;
+                                               c_msg->payload = p_msg;
+                                               arrayList_add(msg_list, c_msg);
+
+                                               if (!zframe_more(p_msg)) {
+                                                       more = false;
+                                               }
+                                       }
+
+                                       celixThreadMutex_lock(&sub->ts_lock);
+                                       process_msg(sub, msg_list);
+                                       celixThreadMutex_unlock(&sub->ts_lock);
+
+                               }
+
+                       } //zframe_more(headerMsg)
+                       else {
+                               free(headerMsg);
+                               printf("PSA_ZMQ_TS: received message %u for 
topic %s without payload!\n", hdr->type, hdr->topic);
+                       }
+
+               } // headerMsg != NULL
+               celixThreadMutex_unlock(&sub->socket_lock);
+               connectPendingPublishers(sub);
+               disconnectPendingPublishers(sub);
+       } // while
+
+       return NULL;
+}
+
+static void connectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingConnections_lock);
+       while(!arrayList_isEmpty(sub->pendingConnections)) {
+               char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+               pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
+
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+       while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+               char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+               pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+       printf("PSA_ZMQ_TS: Topic subscription being shut down...\n");
+       return;
+}
+
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+       bool check=false;
+       int major=0,minor=0;
+
+       if(msgVersion!=NULL){
+               version_getMajor(msgVersion,&major);
+               version_getMinor(msgVersion,&minor);
+               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
+                       check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
+               }
+       }
+
+       return check;
+}
+
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
+}
+
+static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part){
+
+       if(handle==NULL){
+               *part = NULL;
+               return -1;
+       }
+
+       mp_handle_pt mp_handle = (mp_handle_pt)handle;
+       msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map, 
(void*)(uintptr_t) msgTypeId);
+       if(entry!=NULL){
+               entry->retain = retain;
+               *part = entry->msgInst;
+       }
+       else{
+               printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId);
+               *part=NULL;
+               return -2;
+       }
+
+       return 0;
+
+}
+
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list){
+
+       if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart 
message
+               return NULL;
+       }
+
+       mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
+       mp_handle->svc_msg_db = svc_msg_db;
+       mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
+
+       int i=1; //We skip the first message, it will be handle differently
+       for(;i<arrayList_size(rcv_msg_list);i++){
+               complete_zmq_msg_pt c_msg = 
(complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+               pubsub_msg_header_pt header = 
(pubsub_msg_header_pt)zframe_data(c_msg->header);
+
+               pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, 
(void*)(uintptr_t)(header->type));
+
+               if (msgSer!= NULL) {
+                       void *msgInst = NULL;
+
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,header);
+
+                       if(validVersion){
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void*)zframe_data(c_msg->payload), 0, 
&msgInst);
+
+                               if(status == CELIX_SUCCESS){
+                                       msg_map_entry_pt entry = 
calloc(1,sizeof(struct msg_map_entry));
+                                       entry->msgInst = msgInst;
+                                       hashMap_put(mp_handle->rcv_msg_map, 
(void*)(uintptr_t)header->type,entry);
+                               }
+                       }
+               }
+       }
+
+       return mp_handle;
+
+}
+
+static void destroy_mp_handle(mp_handle_pt mp_handle){
+
+       hash_map_iterator_pt iter = 
hashMapIterator_create(mp_handle->rcv_msg_map);
+       while(hashMapIterator_hasNext(iter)){
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               unsigned int msgId = (unsigned 
int)(uintptr_t)hashMapEntry_getKey(entry);
+               msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
+               pubsub_msg_serializer_t* msgSer = 
hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId);
+
+               if(msgSer!=NULL){
+                       if(!msgEntry->retain){
+                               
msgSer->freeMsg(msgSer->handle,msgEntry->msgInst);
+                       }
+               }
+               else{
+                       printf("PSA_ZMQ_TS: ERROR: Cannot find 
messageSerializer for msg %u, so cannot destroy it!\n",msgId);
+               }
+
+               free(msgEntry);
+       }
+       hashMapIterator_destroy(iter);
+
+       hashMap_destroy(mp_handle->rcv_msg_map,false,false);
+       free(mp_handle);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.h 
b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
new file mode 100644
index 0000000..7267103
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
@@ -0,0 +1,60 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+#include "pubsub_serializer.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context,char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/zmq_crypto.c 
b/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
new file mode 100644
index 0000000..fe444bd
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/zmq_crypto.c
@@ -0,0 +1,281 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.c
+ *
+ *  \date       Dec 2, 2016
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include "zmq_crypto.h"
+
+#include <zmq.h>
+#include <openssl/conf.h>
+#include <openssl/evp.h>
+#include <openssl/err.h>
+
+#include <string.h>
+
+#define MAX_FILE_PATH_LENGTH 512
+#define ZMQ_KEY_LENGTH 40
+#define AES_KEY_LENGTH 32
+#define AES_IV_LENGTH 16
+
+#define KEY_TO_GET "aes_key"
+#define IV_TO_GET "aes_iv"
+
+static char* read_file_content(char* filePath, char* fileName);
+static void parse_key_lines(char *keysBuffer, char **key, char **iv);
+static void parse_key_line(char *line, char **key, char **iv);
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char 
**publicKey, char **secretKey);
+
+/**
+ * Return a valid zcert_t from an encoded file
+ * Caller is responsible for freeing by calling zcert_destroy(zcert** cert);
+ */
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, 
char* file_path)
+{
+
+       if (keysFilePath == NULL){
+               keysFilePath = DEFAULT_KEYS_FILE_PATH;
+       }
+
+       if (keysFileName == NULL){
+               keysFileName = DEFAULT_KEYS_FILE_NAME;
+       }
+
+       char* keys_data = read_file_content(keysFilePath, keysFileName);
+       if (keys_data == NULL){
+               return NULL;
+       }
+
+       char *key = NULL;
+       char *iv = NULL;
+       parse_key_lines(keys_data, &key, &iv);
+       free(keys_data);
+
+       if (key == NULL || iv == NULL){
+               free(key);
+               free(iv);
+
+               printf("CRYPTO: Loading AES key and/or AES iv failed!\n");
+               return NULL;
+       }
+
+       //At this point, we know an aes key and iv are stored and loaded
+
+       // generate sha256 hashes
+       unsigned char key_digest[EVP_MAX_MD_SIZE];
+       unsigned char iv_digest[EVP_MAX_MD_SIZE];
+       generate_sha256_hash((char*) key, key_digest);
+       generate_sha256_hash((char*) iv, iv_digest);
+
+       zchunk_t* encoded_secret = zchunk_slurp (file_path, 0);
+       if (encoded_secret == NULL){
+               free(key);
+               free(iv);
+
+               return NULL;
+       }
+
+       int encoded_secret_size = (int) zchunk_size (encoded_secret);
+       char* encoded_secret_data = zchunk_strdup(encoded_secret);
+       zchunk_destroy (&encoded_secret);
+
+       // Decryption of data
+       int decryptedtext_len;
+       unsigned char decryptedtext[encoded_secret_size];
+       decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, 
encoded_secret_size, key_digest, iv_digest, decryptedtext);
+       decryptedtext[decryptedtext_len] = '\0';
+
+       EVP_cleanup();
+
+       free(encoded_secret_data);
+       free(key);
+       free(iv);
+
+       // The public and private keys are retrieved
+       char* public_text = NULL;
+       char* secret_text = NULL;
+
+       extract_keys_from_buffer(decryptedtext, decryptedtext_len, 
&public_text, &secret_text);
+
+       byte public_key [32] = { 0 };
+       byte secret_key [32] = { 0 };
+
+       zmq_z85_decode (public_key, public_text);
+       zmq_z85_decode (secret_key, secret_text);
+
+       zcert_t* cert_loaded = zcert_new_from(public_key, secret_key);
+
+       free(public_text);
+       free(secret_text);
+
+       return cert_loaded;
+}
+
+int generate_sha256_hash(char* text, unsigned char* digest)
+{
+       unsigned int digest_len;
+
+       EVP_MD_CTX * mdctx = EVP_MD_CTX_new();
+       EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL);
+       EVP_DigestUpdate(mdctx, text, strlen(text));
+       EVP_DigestFinal_ex(mdctx, digest, &digest_len);
+       EVP_MD_CTX_free(mdctx);
+
+       return digest_len;
+}
+
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, 
unsigned char *iv, unsigned char *plaintext)
+{
+       int len;
+       int plaintext_len;
+
+       EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
+
+       EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv);
+       EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len);
+       plaintext_len = len;
+       EVP_DecryptFinal_ex(ctx, plaintext + len, &len);
+       plaintext_len += len;
+
+       EVP_CIPHER_CTX_free(ctx);
+
+       return plaintext_len;
+}
+
+/**
+ * Caller is responsible for freeing the returned value
+ */
+static char* read_file_content(char* filePath, char* fileName){
+
+       char fileNameWithPath[MAX_FILE_PATH_LENGTH];
+       snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, 
fileName);
+       int rc = 0;
+
+       if (!zsys_file_exists(fileNameWithPath)){
+               printf("CRYPTO: Keys file '%s' doesn't exist!\n", 
fileNameWithPath);
+               return NULL;
+       }
+
+       zfile_t* keys_file = zfile_new (filePath, fileName);
+       rc = zfile_input (keys_file);
+       if (rc != 0){
+               zfile_destroy(&keys_file);
+               printf("CRYPTO: Keys file '%s' not readable!\n", 
fileNameWithPath);
+               return NULL;
+       }
+
+       ssize_t keys_file_size = zsys_file_size (fileNameWithPath);
+       zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0);
+       if (keys_chunk == NULL){
+               zfile_close(keys_file);
+               zfile_destroy(&keys_file);
+               printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath);
+               return NULL;
+       }
+
+       char* keys_data = zchunk_strdup(keys_chunk);
+       zchunk_destroy(&keys_chunk);
+       zfile_close(keys_file);
+       zfile_destroy (&keys_file);
+
+       return keys_data;
+}
+
+static void parse_key_lines(char *keysBuffer, char **key, char **iv){
+       char *line = NULL, *saveLinePointer = NULL;
+
+       bool firstTime = true;
+       do {
+               if (firstTime){
+                       line = strtok_r(keysBuffer, "\n", &saveLinePointer);
+                       firstTime = false;
+               }else {
+                       line = strtok_r(NULL, "\n", &saveLinePointer);
+               }
+
+               if (line == NULL){
+                       break;
+               }
+
+               parse_key_line(line, key, iv);
+
+       } while((*key == NULL || *iv == NULL) && line != NULL);
+
+}
+
+static void parse_key_line(char *line, char **key, char **iv){
+       char *detectedKey = NULL, *detectedValue= NULL;
+
+       char* sep_at = strchr(line, ':');
+       if (sep_at == NULL){
+               return;
+       }
+
+       *sep_at = '\0'; // overwrite first separator, creating two strings.
+       detectedKey = line;
+       detectedValue = sep_at + 1;
+
+       if (detectedKey == NULL || detectedValue == NULL){
+               return;
+       }
+       if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){
+               return;
+       }
+
+       if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){
+               *key = strndup(detectedValue, AES_KEY_LENGTH);
+       } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){
+               *iv = strndup(detectedValue, AES_IV_LENGTH);
+       }
+}
+
+static void extract_keys_from_buffer(unsigned char *input, int inputlen, char 
**publicKey, char **secretKey) {
+       // Load decrypted text buffer
+       zchunk_t* secret_decrypted = zchunk_new(input, inputlen);
+       if (secret_decrypted == NULL){
+               printf("CRYPTO: Failed to create zchunk\n");
+               return;
+       }
+
+       zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted);
+       zchunk_destroy (&secret_decrypted);
+       if (secret_config == NULL){
+               printf("CRYPTO: Failed to create zconfig\n");
+               return;
+       }
+
+       // Extract public and secret key from text buffer
+       char* public_text = zconfig_get (secret_config, "/curve/public-key", 
NULL);
+       char* secret_text = zconfig_get (secret_config, "/curve/secret-key", 
NULL);
+
+       if (public_text == NULL || secret_text == NULL){
+               zconfig_destroy(&secret_config);
+               printf("CRYPTO: Loading public / secret key from text-buffer 
failed!\n");
+               return;
+       }
+
+       *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1);
+       *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1);
+
+       zconfig_destroy(&secret_config);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/zmq_crypto.h 
b/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
new file mode 100644
index 0000000..f1a990f
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/zmq_crypto.h
@@ -0,0 +1,41 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * zmq_crypto.h
+ *
+ *  \date       Dec 2, 2016
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef ZMQ_CRYPTO_H_
+#define ZMQ_CRYPTO_H_
+
+#include <czmq.h>
+
+#define PROPERTY_KEYS_FILE_PATH "keys.file.path"
+#define PROPERTY_KEYS_FILE_NAME "keys.file.name"
+#define DEFAULT_KEYS_FILE_PATH "/etc/"
+#define DEFAULT_KEYS_FILE_NAME "pubsub.keys"
+
+zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, 
char* file_path);
+int generate_sha256_hash(char* text, unsigned char* digest);
+int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, 
unsigned char *iv, unsigned char *plaintext);
+
+#endif

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/CMakeLists.txt b/pubsub/pubsub_api/CMakeLists.txt
new file mode 100644
index 0000000..5ceb291
--- /dev/null
+++ b/pubsub/pubsub_api/CMakeLists.txt
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#api target
+add_library(pubsub_api INTERFACE)
+
+target_include_directories(framework INTERFACE
+    $<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+    $<INSTALL_INTERFACE:include/celix/pubsub>
+)
+
+#install api
+install(TARGETS pubsub_api EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} 
COMPONENT pubsub)
+install(DIRECTORY include/ DESTINATION include/celix/pubsub COMPONENT pubsub)
+
+#Setup target aliases to match external usage
+add_library(Celix::pubsub_api ALIAS pubsub_api)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/include/pubsub/publisher.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/publisher.h 
b/pubsub/pubsub_api/include/pubsub/publisher.h
new file mode 100644
index 0000000..3eec149
--- /dev/null
+++ b/pubsub/pubsub_api/include/pubsub/publisher.h
@@ -0,0 +1,88 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * publisher.h
+ *
+ *  \date       Jan 7, 2016
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef __PUBSUB_PUBLISHER_H_
+#define __PUBSUB_PUBLISHER_H_
+
+#include <stdlib.h>
+
+#define PUBSUB_PUBLISHER_SERVICE_NAME           "pubsub.publisher"
+#define PUBSUB_PUBLISHER_SERVICE_VERSION             "2.0.0"
+ 
+//properties
+#define PUBSUB_PUBLISHER_TOPIC                  "pubsub.topic"
+#define PUBSUB_PUBLISHER_SCOPE                  "pubsub.scope"
+#define PUBSUB_PUBLISHER_STRATEGY               "pubsub.strategy"
+#define PUBSUB_PUBLISHER_CONFIG                 "pubsub.config"
+ 
+#define PUBSUB_PUBLISHER_SCOPE_DEFAULT                 "default"
+//flags
+#define PUBSUB_PUBLISHER_FIRST_MSG  01
+#define PUBSUB_PUBLISHER_PART_MSG   02
+#define PUBSUB_PUBLISHER_LAST_MSG   04
+
+struct pubsub_release_callback_struct {
+    void *handle;
+    void (*release)(char *buf, void *handle);
+};
+typedef struct pubsub_release_callback_struct pubsub_release_callback_t;
+typedef struct pubsub_release_callback_struct* pubsub_release_callback_pt;
+ 
+ 
+struct pubsub_publisher {
+    void *handle;
+ 
+    /**
+     * Every msg is identifiable by msg type string. Because masg type string 
are performance wise not preferable (string compares),
+     * a "local" (int / platform dependent) unique id will be generated runtime
+     * with use of a distributed key/value store or communication between  
participation parties.
+     * this is called the local message type id. This local message type id 
can be requested with the localMsgIdForMsgType method.
+     * When return is successful the msgTypeId is always greater than 0. (Note 
this can be used to specify/detect uninitialized msg type ids in the consumer 
code).
+     *
+     * Returns 0 on success.
+     */
+    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, 
unsigned int *msgTypeId);
+  
+    /**
+     * send is a async function, but the msg can be safely deleted after send 
returns.
+     * Returns 0 on success.
+     */
+    int (*send)(void *handle, unsigned int msgTypeId, const void *msg);
+ 
+  
+    /**
+     * sendMultipart is a async function, but the msg can be safely deleted 
after send returns.
+     * The first (primary) message of a multipart message must have the flag 
PUBLISHER_PRIMARY_MSG
+     * The last message of a multipart message must have the flag 
PUBLISHER_LAST_MSG
+     * Returns 0 on success.
+     */
+    int (*sendMultipart)(void *handle, unsigned int msgTypeId, const void 
*msg, int flags);
+ 
+};
+typedef struct pubsub_publisher pubsub_publisher_t;
+typedef struct pubsub_publisher* pubsub_publisher_pt;
+
+#endif // __PUBSUB_PUBLISHER_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_api/include/pubsub/subscriber.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_api/include/pubsub/subscriber.h 
b/pubsub/pubsub_api/include/pubsub/subscriber.h
new file mode 100644
index 0000000..5d87b8a
--- /dev/null
+++ b/pubsub/pubsub_api/include/pubsub/subscriber.h
@@ -0,0 +1,75 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * subscriber.h
+ *
+ *  \date       Jan 7, 2016
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef __PUBSUB_SUBSCRIBER_H_
+#define __PUBSUB_SUBSCRIBER_H_
+
+#include <stdbool.h>
+
+#define PUBSUB_SUBSCRIBER_SERVICE_NAME          "pubsub.subscriber"
+#define PUBSUB_SUBSCRIBER_SERVICE_VERSION       "2.0.0"
+ 
+//properties
+#define PUBSUB_SUBSCRIBER_TOPIC                "pubsub.topic"
+#define PUBSUB_SUBSCRIBER_SCOPE                "pubsub.scope"
+#define PUBSUB_SUBSCRIBER_STRATEGY             "pubsub.strategy"
+#define PUBSUB_SUBSCRIBER_CONFIG               "pubsub.config"
+
+#define PUBSUB_SUBSCRIBER_SCOPE_DEFAULT        "default"
+ 
+struct pubsub_multipart_callbacks_struct {
+    void *handle;
+    int (*localMsgTypeIdForMsgType)(void *handle, const char *msgType, 
unsigned int *msgId);
+    int (*getMultipart)(void *handle, unsigned int msgTypeId, bool retain, 
void **part);
+};
+typedef struct pubsub_multipart_callbacks_struct pubsub_multipart_callbacks_t;
+typedef struct pubsub_multipart_callbacks_struct* 
pubsub_multipart_callbacks_pt;
+ 
+struct pubsub_subscriber_struct {
+    void *handle;
+     
+    /**
+     * When a new message for a topic is available the receive will be called.
+     * 
+     * msgType contains fully qualified name of the type and msgTypeId is a 
local id which presents the type for performance reasons.
+     * Release can be used to instruct the pubsubadmin to release (free) the 
message when receive function returns. Set it to false to take
+     * over ownership of the msg (e.g. take the responsibility to free it).
+     *
+     * The callbacks argument is only valid inside the receive function, use 
the getMultipart callback, with retain=true, to keep multipart messages in 
memory.
+     * results of the localMsgTypeIdForMsgType callback are valid during the 
complete lifecycle of the component, not just a single receive call.
+     *
+     * Return 0 implies a successful handling. If return is not 0, the msg 
will always be released by the pubsubadmin.
+     *
+     * this method can be  NULL.
+     */
+    int (*receive)(void *handle, const char *msgType, unsigned int msgTypeId, 
void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
+
+};
+typedef struct pubsub_subscriber_struct pubsub_subscriber_t;
+typedef struct pubsub_subscriber_struct* pubsub_subscriber_pt;
+
+
+#endif //  __PUBSUB_SUBSCRIBER_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h 
b/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
deleted file mode 100644
index bd39fc0..0000000
--- a/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
-#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
-
-#include "pubsub_endpoint.h"
-
-struct publisher_endpoint_announce {
-       void *handle;
-       celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt 
pubEP);
-       celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt 
pubEP);
-       celix_status_t (*interestedInTopic)(void* handle, const char *scope, 
const char *topic);
-       celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, 
const char *topic);
-};
-
-typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
-
-
-#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h 
b/pubsub/pubsub_common/public/include/pubsub_admin.h
deleted file mode 100644
index f24d825..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin.h
- *
- *  \date       Sep 30, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_H_
-#define PUBSUB_ADMIN_H_
-
-#include "service_reference.h"
-
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
-
-#define PSA_IP         "PSA_IP"
-#define PSA_ITF        "PSA_INTERFACE"
-#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
-
-#define PUBSUB_ADMIN_TYPE_KEY  "pubsub_admin.type"
-
-typedef struct pubsub_admin *pubsub_admin_pt;
-
-struct pubsub_admin_service {
-       pubsub_admin_pt admin;
-
-       celix_status_t (*addSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-       celix_status_t (*removeSubscription)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-
-       celix_status_t (*addPublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-       celix_status_t (*removePublication)(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-
-       celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* 
scope, char* topic);
-       celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* 
scope, char* topic);
-
-       /* Match principle:
-        * - A full matching pubsub_admin gives 200 points
-        * - A full matching serializer gives 100 points
-        * - If QoS = sample
-        *              - fallback pubsub_admin order of selection is: udp_mc, 
zmq. Points allocation is 100,75.
-        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
-        * - If QoS = control
-        *              - fallback pubsub_admin order of selection is: 
zmq,udp_mc. Points allocation is 100,75.
-        *              - fallback serializers order of selection is: json, 
void. Points allocation is 30,20.
-        * - If nothing is specified, QoS = sample is assumed, so the same 
score applies, just divided by two.
-        *
-        */
-       celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
-};
-
-typedef struct pubsub_admin_service *pubsub_admin_service_pt;
-
-#endif /* PUBSUB_ADMIN_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin_match.h 
b/pubsub/pubsub_common/public/include/pubsub_admin_match.h
deleted file mode 100644
index e95ca7d..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_admin_match.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-
-#ifndef PUBSUB_ADMIN_MATCH_H_
-#define PUBSUB_ADMIN_MATCH_H_
-
-#include "celix_errno.h"
-#include "properties.h"
-#include "array_list.h"
-
-#include "pubsub_serializer.h"
-
-#define QOS_ATTRIBUTE_KEY      "attribute.qos"
-#define QOS_TYPE_SAMPLE                "sample"        /* A.k.a. unreliable 
connection */
-#define QOS_TYPE_CONTROL       "control"       /* A.k.a. reliable connection */
-
-#define PUBSUB_ADMIN_FULL_MATCH_SCORE  200.0F
-#define SERIALIZER_FULL_MATCH_SCORE            100.0F
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score);
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
-
-#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h 
b/pubsub/pubsub_common/public/include/pubsub_common.h
deleted file mode 100644
index 5dfd8fd..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_common.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_common.h
- *
- *  \date       Sep 17, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_COMMON_H_
-#define PUBSUB_COMMON_H_
-
-#define PUBSUB_SERIALIZER_SERVICE              "pubsub_serializer"
-#define PUBSUB_ADMIN_SERVICE                   "pubsub_admin"
-#define PUBSUB_DISCOVERY_SERVICE               "pubsub_discovery"
-#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
-
-#define PUBSUB_ANY_SUB_TOPIC                   "any"
-
-#define        PUBSUB_BUNDLE_ID                        "bundle.id"
-
-#define MAX_SCOPE_LEN                           1024
-#define MAX_TOPIC_LEN                          1024
-
-struct pubsub_msg_header{
-       char topic[MAX_TOPIC_LEN];
-       unsigned int type;
-       unsigned char major;
-       unsigned char minor;
-};
-
-typedef struct pubsub_msg_header* pubsub_msg_header_pt;
-
-
-#endif /* PUBSUB_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h 
b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
deleted file mode 100644
index 8a979eb..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_endpoint.h
- *
- *  \date       Sep 21, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ENDPOINT_H_
-#define PUBSUB_ENDPOINT_H_
-
-#include "service_reference.h"
-#include "listener_hook_service.h"
-#include "properties.h"
-
-#include "publisher.h"
-#include "subscriber.h"
-
-struct pubsub_endpoint {
-    char *frameworkUUID;
-    char *scope;
-    char *topic;
-    long serviceID;
-    char* endpoint;
-    bool is_secure;
-    properties_pt topic_props;
-};
-
-typedef struct pubsub_endpoint *pubsub_endpoint_pt;
-
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp, bool isPublisher);
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher);
-celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
-bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
-
-char *createScopeTopicKey(const char* scope, const char* topic);
-
-#endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h 
b/pubsub/pubsub_common/public/include/pubsub_serializer.h
deleted file mode 100644
index 4489fa4..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_serializer.h
- *
- *  \date       Mar 24, 2017
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_SERIALIZER_SERVICE_H_
-#define PUBSUB_SERIALIZER_SERVICE_H_
-
-#include "service_reference.h"
-#include "hash_map.h"
-
-#include "pubsub_common.h"
-
-#define PUBSUB_SERIALIZER_TYPE_KEY     "pubsub_serializer.type"
-
-/**
- * There should be a pubsub_serializer_t
- * per msg type (msg id) per bundle
- *
- * The pubsub_serializer_service can create
- * a serializer_map per bundle. Potentially using
- * the extender pattern.
- */
-
-typedef struct pubsub_msg_serializer {
-       void* handle;
-       unsigned int msgId;
-       const char* msgName;
-       version_pt msgVersion;
-
-       celix_status_t (*serialize)(void* handle, const void* input, void** 
out, size_t* outLen);
-       celix_status_t (*deserialize)(void* handle, const void* input, size_t 
inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
-       void (*freeMsg)(void* handle, void* msg);
-
-} pubsub_msg_serializer_t;
-
-typedef struct pubsub_serializer_service {
-       void* handle;
-
-       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
hash_map_pt* serializerMap);
-       celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt 
serializerMap);
-
-} pubsub_serializer_service_t;
-
-#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor 
b/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
deleted file mode 100644
index c01a2fd..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
+++ /dev/null
@@ -1,10 +0,0 @@
-:header
-type=interface
-name=pubsub_topic_info
-version=1.0.0
-:annotations
-:types
-:methods
-getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
-getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
-getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_utils.h 
b/pubsub/pubsub_common/public/include/pubsub_utils.h
deleted file mode 100644
index aff5c72..0000000
--- a/pubsub/pubsub_common/public/include/pubsub_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_utils.h
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_UTILS_H_
-#define PUBSUB_UTILS_H_
-
-#include "bundle_context.h"
-#include "array_list.h"
-
-char* pubsub_getScopeFromFilter(char* bundle_filter);
-char* pubsub_getTopicFromFilter(char* bundle_filter);
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
-array_list_pt pubsub_getTopicsFromString(char* string);
-
-
-#endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_admin_match.c 
b/pubsub/pubsub_common/public/src/pubsub_admin_match.c
deleted file mode 100644
index 2a695c1..0000000
--- a/pubsub/pubsub_common/public/src/pubsub_admin_match.c
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-
-#include <string.h>
-#include "service_reference.h"
-
-#include "pubsub_admin.h"
-
-#include "pubsub_admin_match.h"
-
-#define KNOWN_PUBSUB_ADMIN_NUM 2
-#define KNOWN_SERIALIZER_NUM   2
-
-static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"udp_mc","zmq"};
-static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
-
-static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = 
{"zmq","udp_mc"};
-static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = 
{"json","void"};
-
-static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
-static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
-
-static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType);
-static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService);
-
-celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char 
*pubsub_admin_type, array_list_pt serializerList, double *score){
-
-       celix_status_t status = CELIX_SUCCESS;
-       double final_score = 0;
-       int i = 0, j = 0;
-
-       const char *requested_admin_type                = NULL;
-       const char *requested_serializer_type   = NULL;
-       const char *requested_qos_type                  = NULL;
-
-       if(endpoint_props!=NULL){
-               requested_admin_type            = 
properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
-               requested_serializer_type       = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
-               requested_qos_type                      = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
-       }
-
-       /* Analyze the pubsub_admin */
-       if(requested_admin_type != NULL){ /* We got precise specification on 
the pubsub_admin we want */
-               
if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){
 //Full match
-                       final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected PSA */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                               
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                                       final_score += 
qos_pubsub_admin_score[i];
-                                       break;
-                               }
-                       }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                               
if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                                       final_score += 
qos_pubsub_admin_score[i];
-                                       break;
-                               }
-                       }
-               }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
-               }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
-                       
if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
-                               final_score += (qos_pubsub_admin_score[i]/2);
-                               break;
-                       }
-               }
-       }
-
-       char *serializer_type = NULL;
-       /* Analyze the serializers */
-       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
-               for(i=0;i<arrayList_size(serializerList);i++){
-                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
-                       get_serializer_type(svcRef, &serializer_type);
-                       if(serializer_type != NULL){
-                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
-                                       final_score += 
SERIALIZER_FULL_MATCH_SCORE;
-                                       break;
-                               }
-                       }
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       ser_found = true;
-                                               }
-                                       }
-                               }
-                               if(ser_found){
-                                       final_score += qos_serializer_score[i];
-                               }
-                       }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       ser_found = true;
-                                               }
-                                       }
-                               }
-                               if(ser_found){
-                                       final_score += qos_serializer_score[i];
-                               }
-                       }
-               }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
-               }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               bool ser_found = false;
-               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                               service_reference_pt svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                               get_serializer_type(svcRef, &serializer_type);
-                               if(serializer_type != NULL){
-                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                               ser_found = true;
-                                       }
-                               }
-                       }
-                       if(ser_found){
-                               final_score += (qos_serializer_score[i]/2);
-                       }
-               }
-       }
-
-       *score = final_score;
-
-       printf("Score for pair <%s,%s> = 
%f\n",pubsub_admin_type,serializer_type,final_score);
-
-       return status;
-}
-
-celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, 
array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
-       celix_status_t status = CELIX_SUCCESS;
-
-       int i = 0, j = 0;
-
-       const char *requested_serializer_type = NULL;
-       const char *requested_qos_type = NULL;
-
-       if (endpoint_props != NULL){
-               requested_serializer_type = 
properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
-               requested_qos_type = 
properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
-       }
-
-       service_reference_pt svcRef = NULL;
-       void *svc = NULL;
-
-       /* Analyze the serializers */
-       if(requested_serializer_type != NULL){ /* We got precise specification 
on the serializer we want */
-               for(i=0;i<arrayList_size(serializerList);i++){
-                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,i);
-                       char *serializer_type = NULL;
-                       get_serializer_type(svcRef, &serializer_type);
-                       if(serializer_type != NULL){
-                               
if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
-                                       manage_service_from_reference(svcRef, 
&svc,true);
-                                       if(svc==NULL){
-                                               printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                       }
-                                       *serSvc = svc;
-                                       break;
-                               }
-                       }
-               }
-       }
-       else if(requested_qos_type != NULL){ /* We got QoS specification that 
will determine the selected serializer */
-               
if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       char *serializer_type = NULL;
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       
manage_service_from_reference(svcRef, &svc,true);
-                                                       if(svc==NULL){
-                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                                       }
-                                                       else{
-                                                               *serSvc = svc;
-                                                               ser_found = 
true;
-                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               else 
if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
-                       bool ser_found = false;
-                       for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                               for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                                       svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                                       char *serializer_type = NULL;
-                                       get_serializer_type(svcRef, 
&serializer_type);
-                                       if(serializer_type != NULL){
-                                               
if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                                       
manage_service_from_reference(svcRef, &svc,true);
-                                                       if(svc==NULL){
-                                                               printf("Cannot 
get pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                                       }
-                                                       else{
-                                                               *serSvc = svc;
-                                                               ser_found = 
true;
-                                                               
printf("Selected %s serializer as best for 
QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-               else{
-                       printf("Unknown QoS type '%s'\n",requested_qos_type);
-                       status = CELIX_ILLEGAL_ARGUMENT;
-               }
-       }
-       else{ /* We got no specification: fallback to Qos=Sample, but count 
half the score */
-               bool ser_found = false;
-               for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
-                       for(j=0;j<arrayList_size(serializerList) && 
!ser_found;j++){
-                               svcRef = 
(service_reference_pt)arrayList_get(serializerList,j);
-                               char *serializer_type = NULL;
-                               get_serializer_type(svcRef, &serializer_type);
-                               if(serializer_type != NULL){
-                                       
if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
-                                               
manage_service_from_reference(svcRef, &svc,true);
-                                               if(svc==NULL){
-                                                       printf("Cannot get 
pubsub_serializer_service from serviceReference %p\n",svcRef);
-                                                       status = 
CELIX_SERVICE_EXCEPTION;
-                                               }
-                                               else{
-                                                       *serSvc = svc;
-                                                       ser_found = true;
-                                                       printf("Selected %s 
serializer as best without any 
specification\n",qos_sample_serializer_prio_list[i]);
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-
-       if(svc!=NULL && svcRef!=NULL){
-               manage_service_from_reference(svcRef, svc, false);
-       }
-
-       return status;
-}
-
-static void get_serializer_type(service_reference_pt svcRef, char 
**serializerType){
-
-       const char *serType = NULL;
-       serviceReference_getProperty(svcRef, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType != NULL){
-               *serializerType = (char*)serType;
-       }
-       else{
-               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",svcRef);
-               *serializerType = NULL;
-       }
-}
-
-static void manage_service_from_reference(service_reference_pt svcRef, void 
**svc, bool getService){
-       bundle_context_pt context = NULL;
-       bundle_pt bundle = NULL;
-       serviceReference_getBundle(svcRef, &bundle);
-       bundle_getContext(bundle, &context);
-       if(getService){
-               bundleContext_getService(context, svcRef, svc);
-       }
-       else{
-               bundleContext_ungetService(context, svcRef, NULL);
-       }
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c 
b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
deleted file mode 100644
index c3fd293..0000000
--- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * endpoint_description.c
- *
- *  \date       25 Jul 2014
- *  \author     <a href="mailto:[email protected]";>Apache Celix Project 
Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-#include <string.h>
-#include <stdlib.h>
-
-#include "celix_errno.h"
-#include "celix_log.h"
-
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
-#include "constants.h"
-
-#include "pubsub_utils.h"
-
-
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps);
-static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher);
-
-static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
-
-       if (fwUUID != NULL) {
-               psEp->frameworkUUID = strdup(fwUUID);
-       }
-
-       if (scope != NULL) {
-               psEp->scope = strdup(scope);
-       }
-
-       if (topic != NULL) {
-               psEp->topic = strdup(topic);
-       }
-
-       psEp->serviceID = serviceId;
-
-       if(endpoint != NULL) {
-               psEp->endpoint = strdup(endpoint);
-       }
-
-       if(topic_props != NULL){
-               if(cloneProps){
-                       properties_copy(topic_props, &(psEp->topic_props));
-               }
-               else{
-                       psEp->topic_props = topic_props;
-               }
-       }
-}
-
-static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const 
char *topic, bool isPublisher){
-
-       properties_pt topic_props = NULL;
-
-       bool isSystemBundle = false;
-       bundle_isSystemBundle(bundle, &isSystemBundle);
-       long bundleId = -1;
-       bundle_isSystemBundle(bundle, &isSystemBundle);
-       bundle_getBundleId(bundle,&bundleId);
-
-       if(isSystemBundle == false) {
-
-               char *bundleRoot = NULL;
-               char* topicPropertiesPath = NULL;
-               bundle_getEntry(bundle, ".", &bundleRoot);
-
-               if(bundleRoot != NULL){
-
-                       asprintf(&topicPropertiesPath, 
"%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", 
topic);
-                       topic_props = properties_load(topicPropertiesPath);
-                       if(topic_props==NULL){
-                               printf("PSEP: Could not load properties for %s 
on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", 
topic,bundleId);
-                       }
-
-                       free(topicPropertiesPath);
-                       free(bundleRoot);
-               }
-       }
-
-       return topic_props;
-}
-
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
-       celix_status_t status = CELIX_SUCCESS;
-
-       *psEp = calloc(1, sizeof(**psEp));
-
-       pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, 
endpoint, topic_props, true);
-
-       return status;
-
-}
-
-celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out){
-       celix_status_t status = CELIX_SUCCESS;
-
-       *out = calloc(1,sizeof(**out));
-
-       pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, 
in->serviceID, in->endpoint, in->topic_props, true);
-
-       return status;
-
-}
-
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* psEp, bool isPublisher){
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
-
-       bundle_pt bundle = NULL;
-       bundle_context_pt ctxt = NULL;
-       const char* fwUUID = NULL;
-       serviceReference_getBundle(reference,&bundle);
-       bundle_getContext(bundle,&ctxt);
-       bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-
-       const char* scope = NULL;
-       serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
-
-       const char* topic = NULL;
-       serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
-
-       const char* serviceId = NULL;
-       
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
-
-       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
-       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
-
-       pubsubEndpoint_setFields(ep, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
-
-       if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
-               fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
-               status = CELIX_BUNDLE_EXCEPTION;
-               pubsubEndpoint_destroy(ep);
-               *psEp = NULL;
-       }
-       else{
-               *psEp = ep;
-       }
-
-       return status;
-
-}
-
-celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher){
-       celix_status_t status = CELIX_SUCCESS;
-
-       const char* fwUUID=NULL;
-       
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-
-       if(fwUUID==NULL){
-               return CELIX_BUNDLE_EXCEPTION;
-       }
-
-       char* topic = pubsub_getTopicFromFilter(info->filter);
-       if(topic==NULL){
-               return CELIX_BUNDLE_EXCEPTION;
-       }
-
-       *psEp = calloc(1, sizeof(**psEp));
-
-       char* scope = pubsub_getScopeFromFilter(info->filter);
-       if(scope == NULL) {
-               scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
-       }
-
-       bundle_pt bundle = NULL;
-       long bundleId = -1;
-       bundleContext_getBundle(info->context,&bundle);
-
-       bundle_getBundleId(bundle,&bundleId);
-
-       properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
-
-       /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
-       pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, 
topic_props, false);
-
-       free(topic);
-       free(scope);
-
-
-       return status;
-}
-
-celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
-
-       if(psEp->frameworkUUID!=NULL){
-               free(psEp->frameworkUUID);
-               psEp->frameworkUUID = NULL;
-       }
-
-       if(psEp->scope!=NULL){
-               free(psEp->scope);
-               psEp->scope = NULL;
-       }
-
-       if(psEp->topic!=NULL){
-               free(psEp->topic);
-               psEp->topic = NULL;
-       }
-
-       if(psEp->endpoint!=NULL){
-               free(psEp->endpoint);
-               psEp->endpoint = NULL;
-       }
-
-       if(psEp->topic_props != NULL){
-               properties_destroy(psEp->topic_props);
-       }
-
-       free(psEp);
-
-       return CELIX_SUCCESS;
-
-}
-
-bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
-
-       return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
-                       (strcmp(psEp1->scope,psEp2->scope)==0) &&
-                       (strcmp(psEp1->topic,psEp2->topic)==0) &&
-                       (psEp1->serviceID == psEp2->serviceID) /*&&
-                       ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
-       );
-}
-
-char *createScopeTopicKey(const char* scope, const char* topic) {
-       char *result = NULL;
-       asprintf(&result, "%s:%s", scope, topic);
-
-       return result;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_common/public/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_utils.c 
b/pubsub/pubsub_common/public/src/pubsub_utils.c
deleted file mode 100644
index abc5ae6..0000000
--- a/pubsub/pubsub_common/public/src/pubsub_utils.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_utils.c
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include <string.h>
-#include <stdlib.h>
-
-#include "constants.h"
-
-#include "pubsub_common.h"
-#include "publisher.h"
-#include "pubsub_utils.h"
-
-#include "array_list.h"
-#include "bundle.h"
-
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-
-#define MAX_KEYBUNDLE_LENGTH 256
-
-char* pubsub_getScopeFromFilter(char* bundle_filter){
-
-       char* scope = NULL;
-
-       char* filter = strdup(bundle_filter);
-
-       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
-       if(oc!=NULL){
-               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
-               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
-                       char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
-                       if(scopes!=NULL){
-
-                               scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
-                               char* bottom=strchr(scopes,')');
-                               *bottom='\0';
-
-                               scope=strdup(scopes);
-                       } else {
-                           scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
-                       }
-               }
-       }
-
-       free(filter);
-
-       return scope;
-}
-
-char* pubsub_getTopicFromFilter(char* bundle_filter){
-
-       char* topic = NULL;
-
-       char* filter = strdup(bundle_filter);
-
-       char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
-       if(oc!=NULL){
-               oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
-               
if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
-
-                       char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
-                       if(topics!=NULL){
-
-                               topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
-                               char* bottom=strchr(topics,')');
-                               *bottom='\0';
-
-                               topic=strdup(topics);
-
-                       }
-               }
-       }
-
-       free(filter);
-
-       return topic;
-
-}
-
-array_list_pt pubsub_getTopicsFromString(char* string){
-
-       array_list_pt topic_list = NULL;
-       arrayList_create(&topic_list);
-
-       char* topics = strdup(string);
-
-       char* topic = strtok(topics,",;|# ");
-       arrayList_add(topic_list,strdup(topic));
-
-       while( (topic = strtok(NULL,",;|# ")) !=NULL){
-               arrayList_add(topic_list,strdup(topic));
-       }
-
-       free(topics);
-
-       return topic_list;
-
-}
-
-/**
- * Loop through all bundles and look for the bundle with the keys inside.
- * If no key bundle found, return NULL
- *
- * Caller is responsible for freeing the object
- */
-char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
-{
-       array_list_pt bundles = NULL;
-       bundleContext_getBundles(ctx, &bundles);
-       int nrOfBundles = arrayList_size(bundles);
-       long bundle_id = -1;
-       char* result = NULL;
-
-       for (int i = 0; i < nrOfBundles; i++){
-               bundle_pt b = arrayList_get(bundles, i);
-
-               /* Skip bundle 0 (framework bundle) since it has no path nor 
revisions */
-               bundle_getBundleId(b, &bundle_id);
-               if(bundle_id==0){
-                       continue;
-               }
-
-               char* dir = NULL;
-               bundle_getEntry(b, ".", &dir);
-
-               char cert_dir[MAX_KEYBUNDLE_LENGTH];
-               snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", 
dir);
-
-               struct stat s;
-               int err = stat(cert_dir, &s);
-               if (err != -1){
-                       if (S_ISDIR(s.st_mode)){
-                               result = dir;
-                               break;
-                       }
-               }
-
-               free(dir);
-       }
-
-       arrayList_destroy(bundles);
-
-       return result;
-}
-

Reply via email to