http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/framework/src/celix_launcher.c
----------------------------------------------------------------------
diff --cc framework/src/celix_launcher.c
index ba83f25,0000000..fe5d0c0
mode 100644,000000..100644
--- a/framework/src/celix_launcher.c
+++ b/framework/src/celix_launcher.c
@@@ -1,242 -1,0 +1,315 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * celix_launcher.c
 + *
 + *  \date       Mar 23, 2010
 + *  \author           <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +
 +#include "celix_launcher.h"
 +
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <libgen.h>
 +#include <signal.h>
 +
 +#ifndef CELIX_NO_CURLINIT
 +#include <curl/curl.h>
 +#endif
 +
 +#include <string.h>
 +#include <curl/curl.h>
 +#include <signal.h>
 +#include <libgen.h>
 +#include "celix_launcher.h"
 +#include "framework.h"
 +#include "linked_list_iterator.h"
 +
 +static void show_usage(char* prog_name);
 +static void shutdown_framework(int signal);
 +static void ignore(int signal);
 +
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, 
framework_pt *framework, properties_pt packedConfig);
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt 
*framework, properties_pt packedConfig);
++
 +#define DEFAULT_CONFIG_FILE "config.properties"
 +
 +static framework_pt framework = NULL;
 +
++/**
++ * Method kept because of usage in examples & unit tests
++ */
 +int celixLauncher_launchWithArgs(int argc, char *argv[]) {
++      return celixLauncher_launchWithArgsAndProps(argc, argv, NULL);
++}
++
++int celixLauncher_launchWithArgsAndProps(int argc, char *argv[], 
properties_pt packedConfig) {
 +      // Perform some minimal command-line option parsing...
 +      char *opt = NULL;
 +      if (argc > 1) {
 +              opt = argv[1];
 +      }
 +
 +      char *config_file = NULL;
 +
 +      if (opt) {
 +              // Check whether the user wants some help...
 +              if (strcmp("-h", opt) == 0 || strcmp("-help", opt) == 0) {
 +                      show_usage(argv[0]);
 +                      return 0;
 +              } else {
 +                      config_file = opt;
 +              }
 +      } else {
 +              config_file = DEFAULT_CONFIG_FILE;
 +      }
 +
 +      struct sigaction sigact;
 +      memset(&sigact, 0, sizeof(sigact));
 +      sigact.sa_handler = shutdown_framework;
 +      sigaction(SIGINT,  &sigact, NULL);
 +      sigaction(SIGTERM, &sigact, NULL);
 +
 +      memset(&sigact, 0, sizeof(sigact));
 +      sigact.sa_handler = ignore;
 +      sigaction(SIGUSR1,  &sigact, NULL);
 +      sigaction(SIGUSR2,  &sigact, NULL);
 +
-       int rc = celixLauncher_launch(config_file, &framework);
++      int rc = celixLauncher_launchWithConfigAndProps(config_file, 
&framework, packedConfig);
 +      if (rc == 0) {
 +              celixLauncher_waitForShutdown(framework);
 +              celixLauncher_destroy(framework);
 +      }
 +      return rc;
 +}
 +
 +static void show_usage(char* prog_name) {
 +      printf("Usage:\n  %s [path/to/config.properties]\n\n", 
basename(prog_name));
 +}
 +
 +static void shutdown_framework(int signal) {
 +      if (framework != NULL) {
 +              celixLauncher_stop(framework); //NOTE main thread will destroy
 +      }
 +}
 +
 +static void ignore(int signal) {
 +      //ignoring for signal SIGUSR1, SIGUSR2. Can be used on threads
 +}
 +
 +int celixLauncher_launch(const char *configFile, framework_pt *framework) {
++      return celixLauncher_launchWithConfigAndProps(configFile, framework, 
NULL);
++}
++
++static int celixLauncher_launchWithConfigAndProps(const char *configFile, 
framework_pt *framework, properties_pt packedConfig){
 +      int status = 0;
 +      FILE *config = fopen(configFile, "r");
-       if (config != NULL) {
++
++      if (config != NULL && packedConfig != NULL) {
++              status = celixLauncher_launchWithStreamAndProps(config, 
framework, packedConfig);
++      } else if (config != NULL) {
 +              status = celixLauncher_launchWithStream(config, framework);
++      } else if (packedConfig != NULL) {
++              status = celixLauncher_launchWithProperties(packedConfig, 
framework);
 +      } else {
 +              fprintf(stderr, "Error: invalid or non-existing configuration 
file: '%s'.", configFile);
 +              perror("");
 +              status = 1;
 +      }
++
 +      return status;
 +}
 +
 +int celixLauncher_launchWithStream(FILE *stream, framework_pt *framework) {
 +      int status = 0;
 +
 +      properties_pt config = properties_loadWithStream(stream);
 +      fclose(stream);
 +      // Make sure we've read it and that nothing went wrong with the file 
access...
 +      if (config == NULL) {
 +              fprintf(stderr, "Error: invalid configuration file");
 +              perror(NULL);
 +              status = 1;
 +      }
 +      else {
 +              status = celixLauncher_launchWithProperties(config, framework);
 +      }
 +
 +      return status;
 +}
 +
++static int celixLauncher_launchWithStreamAndProps(FILE *stream, framework_pt 
*framework, properties_pt packedConfig){
++      int status = 0;
++
++      properties_pt runtimeConfig = properties_loadWithStream(stream);
++      fclose(stream);
++
++      // Make sure we've read it and that nothing went wrong with the file 
access...
++      // If there is no runtimeConfig, the packedConfig can be stored as 
global config
++      if (runtimeConfig == NULL){
++              runtimeConfig = packedConfig;
++      }
++
++      if (runtimeConfig == NULL) {
++              fprintf(stderr, "Error: invalid configuration file");
++              perror(NULL);
++              status = 1;
++      } else {
++              // Check if there's a pre-compiled config available
++              if (packedConfig != NULL){
++                      // runtimeConfig and packedConfig must be merged
++                      // when a duplicate of a key is available, the 
runtimeConfig must be prioritized
++
++                      hash_map_iterator_t iter = 
hashMapIterator_construct(packedConfig);
++
++                      hash_map_entry_pt entry = 
hashMapIterator_nextEntry(&iter);
++
++                      while (entry != NULL) {
++                              const char * key = (const char *) 
hashMapEntry_getKey(entry);
++                              const char * value = (const char *) 
hashMapEntry_getValue(entry);
++
++                              // Check existence of key in runtimeConfig
++                              if (!hashMap_containsKey(runtimeConfig, key)) {
++                                      properties_set(runtimeConfig, key, 
value);
++                              }
++
++                              entry = hashMapIterator_nextEntry(&iter);
++                              if (entry != NULL) {
++                                      key = (const char *) 
hashMapEntry_getKey(entry);
++                                      value = (const char *) 
hashMapEntry_getValue(entry);
++                              }
++                      }
++
++                      // normally, the framework_destroy will clean up the 
properties_pt
++                      // since there are 2 properties_pt available 
(runtimeConfig and packedConfig)
++                      // the packedConfig must be destroyed
++                      properties_destroy(packedConfig);
++              }
++
++              status = celixLauncher_launchWithProperties(runtimeConfig, 
framework);
++      }
++
++      return status;
++}
 +
 +int celixLauncher_launchWithProperties(properties_pt config, framework_pt 
*framework) {
 +      celix_status_t status;
 +#ifndef CELIX_NO_CURLINIT
 +      // Before doing anything else, let's setup Curl
 +      curl_global_init(CURL_GLOBAL_NOTHING);
 +#endif
 +
 +      const char* autoStartProp = properties_get(config, 
"cosgi.auto.start.1");
 +      char* autoStart = NULL;
 +      if (autoStartProp != NULL) {
 +              autoStart = strndup(autoStartProp, 1024*10);
 +      }
 +
 +      status = framework_create(framework, config);
 +      bundle_pt fwBundle = NULL;
 +      if (status == CELIX_SUCCESS) {
 +              status = fw_init(*framework);
 +              if (status == CELIX_SUCCESS) {
 +                      // Start the system bundle
 +                      status = framework_getFrameworkBundle(*framework, 
&fwBundle);
 +
 +                      if(status == CELIX_SUCCESS){
 +                              bundle_start(fwBundle);
 +
 +                              char delims[] = " ";
 +                              char *result = NULL;
 +                              char *save_ptr = NULL;
 +                              linked_list_pt bundles;
 +                              array_list_pt installed = NULL;
 +                              bundle_context_pt context = NULL;
 +                              linked_list_iterator_pt iter = NULL;
 +                              unsigned int i;
 +
 +                              linkedList_create(&bundles);
 +                              result = strtok_r(autoStart, delims, &save_ptr);
 +                              while (result != NULL) {
 +                                      char *location = strdup(result);
 +                                      linkedList_addElement(bundles, 
location);
 +                                      result = strtok_r(NULL, delims, 
&save_ptr);
 +                              }
 +                              // First install all bundles
 +                              // Afterwards start them
 +                              arrayList_create(&installed);
 +                              bundle_getContext(fwBundle, &context);
 +                              iter = linkedListIterator_create(bundles, 0);
 +                              while (linkedListIterator_hasNext(iter)) {
 +                                      bundle_pt current = NULL;
 +                                      char *location = (char *) 
linkedListIterator_next(iter);
 +                                      if 
(bundleContext_installBundle(context, location, &current) == CELIX_SUCCESS) {
 +                                              // Only add bundle if it is 
installed correctly
 +                                              arrayList_add(installed, 
current);
 +                                      } else {
 +                                              printf("Could not install 
bundle from %s\n", location);
 +                                      }
 +                                      linkedListIterator_remove(iter);
 +                                      free(location);
 +                              }
 +                              linkedListIterator_destroy(iter);
 +                              linkedList_destroy(bundles);
 +
 +                              for (i = 0; i < arrayList_size(installed); i++) 
{
 +                                      bundle_pt installedBundle = (bundle_pt) 
arrayList_get(installed, i);
 +                                      
bundle_startWithOptions(installedBundle, 0);
 +                              }
 +
 +                              arrayList_destroy(installed);
 +                      }
 +              }
 +      }
 +
 +      if (status != CELIX_SUCCESS) {
 +              printf("Problem creating framework\n");
 +      }
 +
 +      printf("Launcher: Framework Started\n");
 +
 +      free(autoStart);
 +      
 +      return status;
 +}
 +
 +void celixLauncher_waitForShutdown(framework_pt framework) {
 +      framework_waitForStop(framework);
 +}
 +
 +void celixLauncher_destroy(framework_pt framework) {
 +      framework_destroy(framework);
 +
 +#ifndef CELIX_NO_CURLINIT
 +      // Cleanup Curl
 +      curl_global_cleanup();
 +#endif
 +
 +      printf("Launcher: Exit\n");
 +}
 +
 +void celixLauncher_stop(framework_pt framework) {
 +      bundle_pt fwBundle = NULL;
 +      if( framework_getFrameworkBundle(framework, &fwBundle) == 
CELIX_SUCCESS){
 +              bundle_stop(fwBundle);
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index e43ec29,0000000..44106df
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@@ -1,444 -1,0 +1,437 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
-  *  htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
++ *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
- /*
-  * topic_publication.c
-  *
-  *  \date       Sep 24, 2015
-  *  \author           <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
-  *  \copyright        Apache License, Version 2.0
-  */
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +#include <errno.h>
 +
 +#include <sys/types.h>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "topic_publication.h"
 +#include "pubsub_common.h"
 +#include "publisher.h"
 +#include "large_udp.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#define EP_ADDRESS_LEN                32
 +
 +#define FIRST_SEND_DELAY      2
 +
 +struct topic_publication {
 +      int sendSocket;
 +      char* endpoint;
 +      service_registration_pt svcFactoryReg;
 +      array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +      hash_map_pt boundServices; //<bundle_pt,bound_service>
 +      celix_thread_mutex_t tp_lock;
 +      pubsub_serializer_service_t *serializer;
 +      struct sockaddr_in destAddr;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +      topic_publication_pt parent;
 +      pubsub_publisher_t service;
 +      bundle_pt bundle;
 +      char *scope;
 +      char *topic;
 +      hash_map_pt msgTypes;
 +      unsigned short getCount;
 +      celix_thread_mutex_t mp_lock;
 +      largeUdp_pt largeUdpHandle;
 +}* publish_bundle_bound_service_pt;
 +
 +
 +typedef struct pubsub_msg{
 +      pubsub_msg_header_pt header;
 +      char* payload;
 +      unsigned int payloadSize;
 +} pubsub_msg_t;
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
 +static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 +
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +
 +celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, topic_publication_pt *out){
 +
 +      char* ep = malloc(EP_ADDRESS_LEN);
 +      memset(ep,0,EP_ADDRESS_LEN);
 +      unsigned int port = pubEP->serviceID + 
rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
 +      snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 +
 +
 +      topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +      arrayList_create(&(pub->pub_ep_list));
 +      pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +      celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +      pub->endpoint = ep;
 +      pub->sendSocket = sendSocket;
 +      pub->destAddr.sin_family = AF_INET;
 +      pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 +      pub->destAddr.sin_port = htons(port);
 +
 +      pub->serializer = best_serializer;
 +
 +      pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +      *out = pub;
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +
 +      free(pub->endpoint);
 +      arrayList_destroy(pub->pub_ep_list);
 +
 +      hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +      while(hashMapIterator_hasNext(iter)){
 +              publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
 +              pubsub_destroyPublishBundleBoundService(bound);
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(pub->boundServices,false,false);
 +
 +      pub->svcFactoryReg = NULL;
 +      pub->serializer = NULL;
 +
 +      if(close(pub->sendSocket) != 0){
 +              status = CELIX_FILE_IO_EXCEPTION;
 +      }
 +
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +      free(pub);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      /* Let's register the new service */
 +
 +      pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +      if(pubEP!=NULL){
 +              service_factory_pt factory = calloc(1, sizeof(*factory));
 +              factory->handle = pub;
 +              factory->getService = pubsub_topicPublicationGetService;
 +              factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +              properties_pt props = properties_create();
 +              properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
 +              properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
 +
 +              status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +              if(status != CELIX_SUCCESS){
 +                      properties_destroy(props);
 +                      printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register 
ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, 
pubEP->topic,pubEP->serviceID);
 +              }
 +              else{
 +                      *svcFactory = factory;
 +              }
 +      }
 +      else{
 +              printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint 
after adding it...Should never happen!\n");
 +              status = CELIX_SERVICE_EXCEPTION;
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +      return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +      ep->endpoint = strdup(pub->endpoint);
 +      arrayList_add(pub->pub_ep_list,ep);
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +      arrayList_removeElement(pub->pub_ep_list,ep);
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
 +      array_list_pt list = NULL;
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +      list = arrayList_clone(pub->pub_ep_list);
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +      return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service) {
 +      celix_status_t  status = CELIX_SUCCESS;
 +
 +      topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +      celixThreadMutex_lock(&(publish->tp_lock));
 +
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +      if(bound==NULL){
 +              bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +              if(bound!=NULL){
 +                      hashMap_put(publish->boundServices,bundle,bound);
 +              }
 +      }
 +      else{
 +              bound->getCount++;
 +      }
 +
 +      if (bound != NULL) {
 +              *service = &bound->service;
 +      }
 +
 +      celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +      return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service)  {
 +
 +      topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +      celixThreadMutex_lock(&(publish->tp_lock));
 +
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +      if(bound!=NULL){
 +
 +              bound->getCount--;
 +              if(bound->getCount==0){
 +                      pubsub_destroyPublishBundleBoundService(bound);
 +                      hashMap_remove(publish->boundServices,bundle);
 +              }
 +
 +      }
 +      else{
 +              long bundleId = -1;
 +              bundle_getBundleId(bundle,&bundleId);
 +              printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
 +      }
 +
 +      /* service should be never used for unget, so let's set the pointer to 
NULL */
 +      *service = NULL;
 +
 +      celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
 +      const int iovec_len = 3; // header + size + payload
 +      bool ret = true;
 +
 +      struct iovec msg_iovec[iovec_len];
 +      msg_iovec[0].iov_base = msg->header;
 +      msg_iovec[0].iov_len = sizeof(*msg->header);
 +      msg_iovec[1].iov_base = &msg->payloadSize;
 +      msg_iovec[1].iov_len = sizeof(msg->payloadSize);
 +      msg_iovec[2].iov_base = msg->payload;
 +      msg_iovec[2].iov_len = msg->payloadSize;
 +
 +      delay_first_send_for_late_joiners();
 +
 +      if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, 
msg_iovec, iovec_len, 0, &bound->parent->destAddr, 
sizeof(bound->parent->destAddr)) == -1) {
 +              perror("send_pubsub_msg:sendSocket");
 +              ret = false;
 +      }
 +
 +      if(releaseCallback) {
 +              releaseCallback->release(msg->payload, bound);
 +      }
 +      return ret;
 +
 +}
 +
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *inMsg) {
 +      int status = 0;
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
 +
 +      celixThreadMutex_lock(&(bound->parent->tp_lock));
 +      celixThreadMutex_lock(&(bound->mp_lock));
 +
-       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
++      pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(intptr_t)msgTypeId);
 +
 +      if (msgSer != NULL) {
 +              int major=0, minor=0;
 +
 +              pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
 +              strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +              msg_hdr->type = msgTypeId;
 +
 +
 +              if (msgSer->msgVersion != NULL){
 +                      version_getMajor(msgSer->msgVersion, &major);
 +                      version_getMinor(msgSer->msgVersion, &minor);
 +                      msg_hdr->major = major;
 +                      msg_hdr->minor = minor;
 +              }
 +
 +              void* serializedOutput = NULL;
 +              size_t serializedOutputLen = 0;
 +              msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
 +
 +              pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
 +              msg->header = msg_hdr;
 +              msg->payload = (char*)serializedOutput;
 +              msg->payloadSize = serializedOutputLen;
 +
 +
 +              if(send_pubsub_msg(bound, msg,true, NULL) == false) {
 +                      status = -1;
 +              }
 +              free(msg_hdr);
 +              free(msg);
 +              free(serializedOutput);
 +
 +
 +      } else {
 +              printf("PSA_UDP_MC_TP: No msg serializer available for msg type 
id %d\n", msgTypeId);
 +              status=-1;
 +      }
 +
 +      celixThreadMutex_unlock(&(bound->mp_lock));
 +      celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +      return status;
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
 +      *msgTypeId = utils_stringHash(msgType);
 +      return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +      double scaled = (double)(((double)random())/((double)RAND_MAX));
 +      return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
 +
 +      publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +      if (bound != NULL) {
 +
 +              bound->parent = tp;
 +              bound->bundle = bundle;
 +              bound->getCount = 1;
 +              celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +              if(tp->serializer != NULL){
 +                      
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +              }
 +
 +              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
 +              bound->scope=strdup(pubEP->scope);
 +              bound->topic=strdup(pubEP->topic);
 +              bound->largeUdpHandle = largeUdp_create(1);
 +
 +              bound->service.handle = bound;
 +              bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
 +              bound->service.send = pubsub_topicPublicationSend;
 +              bound->service.sendMultipart = NULL;  //Multipart not supported 
for UDP
 +
 +      }
 +
 +      return bound;
 +}
 +
 +static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
 +
 +      celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +      if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +              
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
 +      }
 +
 +      if(boundSvc->scope!=NULL){
 +              free(boundSvc->scope);
 +      }
 +
 +      if(boundSvc->topic!=NULL){
 +              free(boundSvc->topic);
 +      }
 +
 +      largeUdp_destroy(boundSvc->largeUdpHandle);
 +
 +      celixThreadMutex_unlock(&boundSvc->mp_lock);
 +      celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +      free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +      static bool firstSend = true;
 +
 +      if(firstSend){
 +              printf("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
 +              sleep(FIRST_SEND_DELAY);
 +              firstSend = false;
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/shell/CMakeLists.txt
----------------------------------------------------------------------
diff --cc shell/CMakeLists.txt
index ae8cf3f,11a16c1..b8aaac3
--- a/shell/CMakeLists.txt
+++ b/shell/CMakeLists.txt
@@@ -18,35 -18,38 +18,35 @@@ celix_subproject(SHELL "Option to enabl
  if (SHELL)
        find_package(CURL REQUIRED)
  
 +      add_library(shell_api INTERFACE)
 +      target_include_directories(shell_api INTERFACE include)
 +
      add_bundle(shell
          SYMBOLIC_NAME "apache_celix_shell"
-         VERSION "2.0.0"
+         VERSION "2.1.0"
          NAME "Apache Celix Shell"
 -
          SOURCES
 +          src/activator
 +          src/shell
 +          src/lb_command
 +          src/start_command
 +          src/stop_command
 +          src/install_command
 +          src/update_command
 +          src/uninstall_command
 +          src/log_command
 +          src/inspect_command
 +          src/help_command
 +      )
 +      target_include_directories(shell PRIVATE src ${CURL_INCLUDE_DIRS})
 +      target_link_libraries(shell PRIVATE Celix::shell_api ${CURL_LIBRARIES} 
Celix::log_service_api Celix::log_helper)
  
 -          private/src/activator
 -          private/src/shell
 -          private/src/lb_command
 -          private/src/start_command
 -          private/src/stop_command
 -          private/src/install_command
 -          private/src/update_command
 -          private/src/uninstall_command
 -          private/src/log_command
 -          private/src/inspect_command
 -          private/src/help_command
 -
 -          ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 -
 -    )
 -    
 -    install_bundle(shell
 +      install_bundle(shell
        HEADERS
 -              public/include/shell.h public/include/command.h 
public/include/shell_constants.h
 -      )
 +              include/shell.h include/command.h include/shell_constants.h
 +    )
  
 -      include_directories("public/include")
 -      include_directories("private/include")
 -    include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
 -    include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
 -      include_directories(${CURL_INCLUDE_DIRS})
 -    target_link_libraries(shell celix_framework ${CURL_LIBRARIES})
 +      #Setup target aliases to match external usage
 +      add_library(Celix::shell_api ALIAS shell_api)
 +      add_library(Celix::shell ALIAS shell)
  endif (SHELL)

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/include/properties.h
----------------------------------------------------------------------
diff --cc utils/include/properties.h
index cf93ca0,0000000..5c6dc4d
mode 100644,000000..100644
--- a/utils/include/properties.h
+++ b/utils/include/properties.h
@@@ -1,66 -1,0 +1,68 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.h
 + *
 + *  \date       Apr 27, 2010
 + *  \author           <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +
 +#ifndef PROPERTIES_H_
 +#define PROPERTIES_H_
 +
 +#include <stdio.h>
 +
 +#include "hash_map.h"
 +#include "exports.h"
 +#include "celix_errno.h"
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +typedef hash_map_pt properties_pt;
 +typedef hash_map_t properties_t;
 +
 +UTILS_EXPORT properties_pt properties_create(void);
 +
 +UTILS_EXPORT void properties_destroy(properties_pt properties);
 +
 +UTILS_EXPORT properties_pt properties_load(const char *filename);
 +
 +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream);
 +
++UTILS_EXPORT properties_pt properties_loadFromString(const char *input);
++
 +UTILS_EXPORT void properties_store(properties_pt properties, const char 
*file, const char *header);
 +
 +UTILS_EXPORT const char *properties_get(properties_pt properties, const char 
*key);
 +
 +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, 
const char *key, const char *defaultValue);
 +
 +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, 
const char *value);
 +
 +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, 
properties_pt *copy);
 +
 +#define PROPERTIES_FOR_EACH(props, key) \
 +    for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
 +        hashMapIterator_hasNext(&iter), (key) = (const 
char*)hashMapIterator_nextKey(&iter);)
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +#endif /* PROPERTIES_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/ac0d0d77/utils/src/properties.c
----------------------------------------------------------------------
diff --cc utils/src/properties.c
index 0bd6dc3,0000000..1e097a0
mode 100644,000000..100644
--- a/utils/src/properties.c
+++ b/utils/src/properties.c
@@@ -1,302 -1,0 +1,330 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.c
 + *
 + *  \date       Apr 27, 2010
 + *  \author           <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <ctype.h>
 +#include "celixbool.h"
 +#include "properties.h"
 +#include "utils.h"
 +
 +#define MALLOC_BLOCK_SIZE             5
 +
 +static void parseLine(const char* line, properties_pt props);
 +
 +properties_pt properties_create(void) {
 +      return hashMap_create(utils_stringHash, utils_stringHash, 
utils_stringEquals, utils_stringEquals);
 +}
 +
 +void properties_destroy(properties_pt properties) {
 +      hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +      while (hashMapIterator_hasNext(iter)) {
 +              hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +              free(hashMapEntry_getKey(entry));
 +              free(hashMapEntry_getValue(entry));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(properties, false, false);
 +}
 +
 +properties_pt properties_load(const char* filename) {
 +      FILE *file = fopen(filename, "r");
 +      if(file==NULL){
 +              return NULL;
 +      }
 +      properties_pt props = properties_loadWithStream(file);
 +      fclose(file);
 +      return props;
 +}
 +
 +properties_pt properties_loadWithStream(FILE *file) {
 +      properties_pt props = NULL;
 +
 +
 +      if (file != NULL ) {
 +              char *saveptr;
 +              char *filebuffer = NULL;
 +              char *line = NULL;
 +              size_t file_size = 0;
 +
 +              props = properties_create();
 +              fseek(file, 0, SEEK_END);
 +              file_size = ftell(file);
 +              fseek(file, 0, SEEK_SET);
 +
 +              if(file_size > 0){
 +                      filebuffer = calloc(file_size + 1, sizeof(char));
 +                      if(filebuffer) {
 +                              size_t rs = fread(filebuffer, sizeof(char), 
file_size, file);
 +                              if(rs != file_size){
 +                                      fprintf(stderr,"fread read only %lu 
bytes out of %lu\n",rs,file_size);
 +                              }
 +                              filebuffer[file_size]='\0';
 +                              line = strtok_r(filebuffer, "\n", &saveptr);
 +                              while ( line != NULL ) {
 +                                      parseLine(line, props);
 +                                      line = strtok_r(NULL, "\n", &saveptr);
 +                              }
 +                              free(filebuffer);
 +                      }
 +              }
 +      }
 +
 +      return props;
 +}
 +
++properties_pt properties_loadFromString(const char *input){
++      properties_pt props = properties_create();
++
++      char *in = strdup(input);
++      char *line = NULL;
++      char *saveLinePointer = NULL;
++
++      bool firstTime = true;
++      do {
++              if (firstTime){
++                      line = strtok_r(in, "\n", &saveLinePointer);
++                      firstTime = false;
++              }else {
++                      line = strtok_r(NULL, "\n", &saveLinePointer);
++              }
++
++              if (line == NULL){
++                      break;
++              }
++
++              parseLine(line, props);
++      } while(line != NULL);
++
++      free(in);
++
++      return props;
++}
++
 +
 +/**
 + * Header is ignored for now, cannot handle comments yet
 + */
 +void properties_store(properties_pt properties, const char* filename, const 
char* header) {
 +      FILE *file = fopen ( filename, "w+" );
 +      char *str;
 +
 +      if (file != NULL) {
 +              if (hashMap_size(properties) > 0) {
 +                      hash_map_iterator_pt iterator = 
hashMapIterator_create(properties);
 +                      while (hashMapIterator_hasNext(iterator)) {
 +                              hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
 +                              str = hashMapEntry_getKey(entry);
 +                              for (int i = 0; i < strlen(str); i += 1) {
 +                                      if (str[i] == '#' || str[i] == '!' || 
str[i] == '=' || str[i] == ':') {
 +                                              fputc('\\', file);
 +                                      }
 +                                      fputc(str[i], file);
 +                              }
 +
 +                              fputc('=', file);
 +
 +                              str = hashMapEntry_getValue(entry);
 +                              for (int i = 0; i < strlen(str); i += 1) {
 +                                      if (str[i] == '#' || str[i] == '!' || 
str[i] == '=' || str[i] == ':') {
 +                                              fputc('\\', file);
 +                                      }
 +                                      fputc(str[i], file);
 +                              }
 +
 +                              fputc('\n', file);
 +
 +                      }
 +                      hashMapIterator_destroy(iterator);
 +              }
 +              fclose(file);
 +      } else {
 +              perror("File is null");
 +      }
 +}
 +
 +celix_status_t properties_copy(properties_pt properties, properties_pt *out) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      properties_pt copy = properties_create();
 +
 +      if (copy != NULL) {
 +              hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +              while (hashMapIterator_hasNext(iter)) {
 +                      hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
 +                      char *key = hashMapEntry_getKey(entry);
 +                      char *value = hashMapEntry_getValue(entry);
 +                      properties_set(copy, key, value);
 +              }
 +              hashMapIterator_destroy(iter);
 +      } else {
 +              status = CELIX_ENOMEM;
 +      }
 +
 +      if (status == CELIX_SUCCESS) {
 +              *out = copy;
 +      }
 +
 +      return status;
 +}
 +
 +const char* properties_get(properties_pt properties, const char* key) {
 +      return hashMap_get(properties, (void*)key);
 +}
 +
 +const char* properties_getWithDefault(properties_pt properties, const char* 
key, const char* defaultValue) {
 +      const char* value = properties_get(properties, key);
 +      return value == NULL ? defaultValue : value;
 +}
 +
 +void properties_set(properties_pt properties, const char* key, const char* 
value) {
 +      hash_map_entry_pt entry = hashMap_getEntry(properties, key);
 +      char* oldValue = NULL;
 +      if (entry != NULL) {
 +              char* oldKey = hashMapEntry_getKey(entry);
 +              oldValue = hashMapEntry_getValue(entry);
 +              hashMap_put(properties, oldKey, strndup(value, 1024*10));
 +      } else {
 +              hashMap_put(properties, strndup(key, 1024*10), strndup(value, 
1024*10));
 +      }
 +      free(oldValue);
 +}
 +
 +static void updateBuffers(char **key, char ** value, char **output, int 
outputPos, int *key_len, int *value_len) {
 +      if (*output == *key) {
 +              if (outputPos == (*key_len) - 1) {
 +                      (*key_len) += MALLOC_BLOCK_SIZE;
 +                      *key = realloc(*key, *key_len);
 +                      *output = *key;
 +              }
 +      }
 +      else {
 +              if (outputPos == (*value_len) - 1) {
 +                      (*value_len) += MALLOC_BLOCK_SIZE;
 +                      *value = realloc(*value, *value_len);
 +                      *output = *value;
 +              }
 +      }
 +}
 +
 +static void parseLine(const char* line, properties_pt props) {
 +      int linePos = 0;
 +      bool precedingCharIsBackslash = false;
 +      bool isComment = false;
 +      int outputPos = 0;
 +      char *output = NULL;
 +      int key_len = MALLOC_BLOCK_SIZE;
 +      int value_len = MALLOC_BLOCK_SIZE;
 +      linePos = 0;
 +      precedingCharIsBackslash = false;
 +      isComment = false;
 +      output = NULL;
 +      outputPos = 0;
 +
 +      //Ignore empty lines
 +      if (line[0] == '\n' && line[1] == '\0') {
 +              return;
 +      }
 +
 +      char *key = calloc(1, key_len);
 +      char *value = calloc(1, value_len);
 +      key[0] = '\0';
 +      value[0] = '\0';
 +
 +      while (line[linePos] != '\0') {
 +              if (line[linePos] == ' ' || line[linePos] == '\t') {
 +                      if (output == NULL) {
 +                              //ignore
 +                              linePos += 1;
 +                              continue;
 +                      }
 +              }
 +              else {
 +                      if (output == NULL) {
 +                              output = key;
 +                      }
 +              }
 +              if (line[linePos] == '=' || line[linePos] == ':' || 
line[linePos] == '#' || line[linePos] == '!') {
 +                      if (precedingCharIsBackslash) {
 +                              //escaped special character
 +                              output[outputPos++] = line[linePos];
 +                              updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +                              precedingCharIsBackslash = false;
 +                      }
 +                      else {
 +                              if (line[linePos] == '#' || line[linePos] == 
'!') {
 +                                      if (outputPos == 0) {
 +                                              isComment = true;
 +                                              break;
 +                                      }
 +                                      else {
 +                                              output[outputPos++] = 
line[linePos];
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                      }
 +                              }
 +                              else { // = or :
 +                                      if (output == value) { //already have a 
seperator
 +                                              output[outputPos++] = 
line[linePos];
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                      }
 +                                      else {
 +                                              output[outputPos++] = '\0';
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                              output = value;
 +                                              outputPos = 0;
 +                                      }
 +                              }
 +                      }
 +              }
 +              else if (line[linePos] == '\\') {
 +                      if (precedingCharIsBackslash) { //double backslash -> 
backslash
 +                              output[outputPos++] = '\\';
 +                              updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +                      }
 +                      precedingCharIsBackslash = true;
 +              }
 +              else { //normal character
 +                      precedingCharIsBackslash = false;
 +                      output[outputPos++] = line[linePos];
 +                      updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +              }
 +              linePos += 1;
 +      }
 +      if (output != NULL) {
 +              output[outputPos] = '\0';
 +      }
 +
 +      if (!isComment) {
 +              //printf("putting 'key'/'value' '%s'/'%s' in properties\n", 
utils_stringTrim(key), utils_stringTrim(value));
 +              properties_set(props, utils_stringTrim(key), 
utils_stringTrim(value));
 +      }
 +      if(key) {
 +              free(key);
 +      }
 +      if(value) {
 +              free(value);
 +      }
 +
 +}

Reply via email to