This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch feature/pubsub_custom_serializers
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to
refs/heads/feature/pubsub_custom_serializers by this push:
new eb6107e Add missing pubsub serialization provider file
eb6107e is described below
commit eb6107e06021492e503c82b708d80649277a2eda
Author: Pepijn Noltes <[email protected]>
AuthorDate: Thu Apr 16 19:43:10 2020 +0200
Add missing pubsub serialization provider file
---
.../src/pubsub_serialization_provider.c | 666 +++++++++++++++++++++
1 file changed, 666 insertions(+)
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
new file mode 100644
index 0000000..2b686c2
--- /dev/null
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
@@ -0,0 +1,666 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include "pubsub_serialization_provider.h"
+
+#include <stdlib.h>
+#include <stdarg.h>
+#include <dirent.h>
+#include <string.h>
+
+#include "dyn_function.h"
+#include "celix_version.h"
+#include "celix_utils.h"
+#include "dyn_message.h"
+#include "pubsub_utils.h"
+#include "log_helper.h"
+#include "pubsub_message_serialization_service.h"
+#include "celix_shell_command.h"
+
+#define MAX_PATH_LEN 1024
+
+typedef enum
+{
+ FIT_INVALID = 0,
+ FIT_DESCRIPTOR = 1,
+ FIT_AVPR = 2
+} descriptor_type_e;
+
+#define L_DEBUG(...) \
+ logHelper_log(provider->log, OSGI_LOGSERVICE_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+ logHelper_log(provider->log, OSGI_LOGSERVICE_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+ logHelper_log(provider->log, OSGI_LOGSERVICE_WARNING, __VA_ARGS__)
+#define L_ERROR(...) \
+ logHelper_log(provider->log, OSGI_LOGSERVICE_ERROR, __VA_ARGS__)
+
+
+struct pubsub_serialization_provider {
+ celix_bundle_context_t *ctx;
+ log_helper_t *log;
+ char* serializationType;
+
+ //serialization callbacks
+ celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const
void* msg, struct iovec** output, size_t* outputIovLen);
+ void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct
iovec* input, size_t inputIovLen);
+ celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry, const
struct iovec* input, size_t inputIovLen __attribute__((unused)), void **out);
+ void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void *msg);
+
+ //updated serialization services
+ long bundleTrackerId;
+
+ pubsub_message_serialization_marker_t markerSvc;
+ long serializationMarkerSvcId;
+
+ celix_shell_command_t cmdSvc;
+ long cmdSvcId;
+
+ celix_thread_mutex_t mutex; //protects below
+ celix_array_list_t *serializationSvcEntries; //key =
pubsub_serialization_entry;
+};
+
+static void dfi_log(void *handle, int level, const char *file, int line, const
char *msg, ...) {
+ va_list ap;
+ pubsub_serialization_provider_t *provider = handle;
+ char *logStr = NULL;
+ va_start(ap, msg);
+ vasprintf(&logStr, msg, ap);
+ va_end(ap);
+ logHelper_log(provider->log, level, "FILE:%s, LINE:%i, MSG:%s", file,
line, logStr);
+ free(logStr);
+}
+
+static descriptor_type_e getDescriptorType(const char* filename) {
+ if (strstr(filename, ".descriptor")) {
+ return FIT_DESCRIPTOR;
+ }
+ else if (strstr(filename, ".properties")) {
+ return FIT_AVPR;
+ }
+ else {
+ return FIT_INVALID;
+ }
+}
+
+static bool readPropertiesFile(pubsub_serialization_provider_t* provider,
const char* properties_file_name, const char* root, char* avpr_fqn, char* path)
{
+ snprintf(path, MAX_PATH_LEN, "%s/%s", root, properties_file_name); // use
path to create path to properties file
+ FILE *properties = fopen(path, "r");
+ if (!properties) {
+ L_WARN("Could not find or open %s as a properties file in %s\n",
properties_file_name, root);
+ return false;
+ }
+
+ *avpr_fqn = '\0';
+ *path = '\0'; //re-use path to create path to avpr file
+ char *p_line = malloc(MAX_PATH_LEN);
+ size_t line_len = MAX_PATH_LEN;
+ while (getline(&p_line, &line_len, properties) >= 0) {
+ if (strncmp(p_line, "fqn=", strlen("fqn=")) == 0) {
+ snprintf(avpr_fqn, MAX_PATH_LEN, "%s", (p_line + strlen("fqn=")));
+ avpr_fqn[strcspn(avpr_fqn, "\n")] = 0;
+ }
+ else if (strncmp(p_line, "avpr=", strlen("avpr=")) == 0) {
+ snprintf(path, MAX_PATH_LEN, "%s/%s", root, (p_line +
strlen("avpr=")));
+ path[strcspn(path, "\n")] = 0;
+ }
+ }
+ free(p_line);
+ fclose(properties);
+
+ if (*avpr_fqn == '\0') {
+ L_WARN("File %s does not contain a fully qualified name for the
parser\n", properties_file_name);
+ return false;
+ }
+
+ if (*path == '\0') {
+ L_WARN("File %s does not contain a location for the avpr file\n",
properties_file_name);
+ return false;
+ }
+
+ return true;
+}
+
+static FILE* openFileStream(pubsub_serialization_provider_t* provider,
descriptor_type_e descriptorType, const char* filename, const char* root, char*
avpr_fqn, char* pathOrError) {
+ FILE* result = NULL;
+ memset(pathOrError, 0, MAX_PATH_LEN);
+ switch (descriptorType) {
+ case FIT_INVALID:
+ snprintf(pathOrError, MAX_PATH_LEN, "Because %s is not a valid
file", filename);
+ break;
+ case FIT_DESCRIPTOR:
+ snprintf(pathOrError, MAX_PATH_LEN, "%s/%s", root, filename);
+ result = fopen(pathOrError, "r");
+ break;
+ case FIT_AVPR:
+ if (readPropertiesFile(provider, filename, root, avpr_fqn,
pathOrError)) {
+ result = fopen(pathOrError, "r");
+ }
+ break;
+ default:
+ L_WARN("Unknown file input type, returning NULL!\n");
+ break;
+ }
+
+ return result;
+}
+
+static unsigned int
pubsub_serializationProvider_getMsgId(pubsub_serialization_provider_t* provider
__attribute__((unused)), dyn_message_type *msg) {
+ unsigned int msgId = 0;
+
+ char *msgName = NULL;
+ dynMessage_getName(msg, &msgName);
+
+ char *msgIdStr = NULL;
+ int rv = dynMessage_getAnnotationEntry(msg, "msgId", &msgIdStr);
+ if (rv == CELIX_SUCCESS && msgIdStr != NULL) {
+ // custom msg id passed, use it
+ long customMsgId = strtol(msgIdStr, NULL, 10);
+ if (customMsgId > 0) {
+ msgId = (unsigned int) customMsgId;
+ }
+ }
+ if (msgId == 0) {
+ msgId = celix_utils_stringHash(msgName);
+ }
+
+ return msgId;
+}
+
+static dyn_message_type*
pubsub_serializationProvider_parseDfiDescriptor(pubsub_serialization_provider_t*
provider, FILE* stream, const char* entryPath) {
+ dyn_message_type *msg = NULL;
+ int rc = dynMessage_parse(stream, &msg);
+ if (rc != 0 || msg == NULL) {
+ L_WARN("Cannot parse message from descriptor from entry %s.\n",
entryPath);
+ return NULL;
+ }
+
+ char *msgName = NULL;
+ rc += dynMessage_getName(msg, &msgName);
+
+ version_pt msgVersion = NULL;
+ rc += dynMessage_getVersion(msg, &msgVersion);
+
+ if (rc != 0 || msgName == NULL || msgVersion == NULL) {
+ L_WARN("Cannot retrieve name and/or version from msg, using entry
%s.\n", entryPath);
+ dynMessage_destroy(msg);
+ return NULL;
+ }
+
+ return msg;
+}
+
+//TODO FIXME, see #158
+//
+// static dyn_message_type*
pubsub_serializationProvider_parseAvprDescriptor(pubsub_serialization_provider_t*
provider, FILE* stream, const char *entryName, const char* fqn) {
+//
+// //dyn_message_type* msgType = dynMessage_parseAvpr(file_ptr, fqn);
+// dyn_message_type* msgType = NULL;
+//
+// if (!msgType) {
+// L_WARN("[json serializer] Cannot parse avpr file '%s'\n", fqn);
+// return -1;
+// }
+//
+// dyn_type* type;
+// dynMessage_getMessageType(msgType, &type);
+//
+// const char *msgName = dynType_getName(type);
+//
+// version_pt msgVersion = NULL;
+// celix_status_t s =
version_createVersionFromString(dynType_getMetaInfo(type, "version"),
&msgVersion);
+//
+// if (s != CELIX_SUCCESS || !msgName) {
+// L_WARN("[json serializer] Cannot retrieve name and/or version from
msg\n");
+// if (s == CELIX_SUCCESS) {
+// version_destroy(msgVersion);
+// }
+// return -1;
+// }
+//
+// unsigned int msgId = 0;
+// const char *msgIdStr = dynType_getMetaInfo(type, "msgId");
+// if (msgIdStr != NULL) {
+// // custom msg id passed, use it
+// long customMsgId = strtol(msgIdStr, NULL, 10);
+// if (customMsgId > 0)
+// msgId = (unsigned int) customMsgId;
+// }
+//
+// if (msgId == 0) {
+// msgId = utils_stringHash(msgName);
+// }
+//
+//
+//
+// return 0;
+//}
+//}
+
+/**
+ * Returns true if the msgType is valid and uqinue (new msg fqn & msg id).
+ * Logs error if msg id clashes or versions are different.
+ * Logs warning if descriptors are different.
+ */
+static bool
pubsub_serializationProvider_isUniqueAndCheckValid(pubsub_serialization_provider_t*
provider, pubsub_serialization_entry_t* entry) {
+ bool unique = true;
+
+ celixThreadMutex_lock(&provider->mutex);
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t* visit =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (visit->msgId == entry->msgId || strncmp(visit->msgFqn,
entry->msgFqn, 1024*1024) == 0) {
+ unique = false; //already have a descriptor with the same id or
fqn. check if valid
+ visit->nrOfTimesRead += 1;
+ if (visit->msgId == entry->msgId && strncmp(visit->msgFqn,
entry->msgFqn, 1024*1024) != 0) {
+ L_ERROR("Error descriptor adding %s. Found msg types with same
msg id, but different msg fqn. Msg id is %d, but found fully qualified names
are '%s' and '%s'",
+ entry->readFromEntryPath, entry->msgId, entry->msgFqn,
visit->msgFqn);
+ entry->invalidReason = "msg id clash";
+ entry->valid = false;
+ } else if (strncmp(visit->msgFqn, entry->msgFqn, 1024*1024) == 0
&& entry->msgId != visit->msgId) {
+ L_ERROR("Error descriptor adding %s. Found msg types with same
fqn, but different msg ids. Msg fqn is %d, but found msg ids are '%d' and '%d'",
+ entry->readFromEntryPath, entry->msgFqn, entry->msgId,
visit->msgId);
+ entry->invalidReason = "msg fqn clash";
+ entry->valid = false;
+ } else if (celix_version_compareTo(visit->msgVersion,
entry->msgVersion) != 0) {
+ L_ERROR("Error descriptor adding %s. Found two different
version for msg %s. This is not supported, please align used versions between
bundles!. Versions found %s and %s",
+ entry->readFromEntryPath, entry->msgFqn,
entry->msgVersionStr, visit->msgVersionStr);
+ entry->invalidReason = "different versions for the same msg
type";
+ entry->valid = false;
+ } else if (strncmp(visit->descriptorContent,
entry->descriptorContent, 1024*1014*10) != 0) {
+ L_ERROR("Error adding descriptor %s. Found different
descriptor content between '%s' and %s", entry->readFromEntryPath,
entry->descriptorContent, visit->descriptorContent);
+ entry->invalidReason = "different versions for the same msg
type";
+ entry->valid = false;
+ }
+ }
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+
+ return unique;
+}
+
+static void
pubsub_serializationProvider_registerSerializationEntry(pubsub_serialization_provider_t*
provider, pubsub_serialization_entry_t* entry) {
+ if (entry->svcId == -1L) {
+ celix_properties_t* props = celix_properties_create();
+ celix_properties_set(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, entry->msgFqn);
+ celix_properties_set(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY,
entry->msgVersionStr);
+ celix_properties_setLong(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, (long)entry->msgId);
+ celix_properties_set(props,
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY,
provider->serializationType);
+
+ celix_service_registration_options_t opts =
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = &entry->svc;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION;
+ opts.properties = props;
+ entry->svcId =
celix_bundleContext_registerServiceWithOptions(provider->ctx, &opts);
+ }
+}
+
+static void
pubsub_serializationProvider_parseDescriptors(pubsub_serialization_provider_t*
provider, const char *root, long bndId) {
+ char fqn[MAX_PATH_LEN];
+ char pathOrError[MAX_PATH_LEN];
+ const char* entry_name = NULL;
+ descriptor_type_e descriptorType;
+
+ const struct dirent *entry = NULL;
+ DIR* dir = opendir(root);
+ if (dir) {
+ entry = readdir(dir);
+ }
+
+ for (; entry != NULL; entry = readdir(dir)) {
+ FILE* stream = NULL;
+ entry_name = entry->d_name;
+ descriptorType = getDescriptorType(entry_name);
+ if (descriptorType != FIT_INVALID) {
+ L_DEBUG("Parsing entry '%s'\n", entry_name);
+ stream = openFileStream(provider, descriptorType, entry_name,
root, /*out*/fqn, /*out*/pathOrError);
+ if (!stream) {
+ L_WARN("Cannot open descriptor file: '%s'\n", pathOrError);
+ }
+ }
+
+ if (!stream) {
+ continue; // Go to next entry in directory
+ }
+
+ char *entryPath = NULL;
+ asprintf(&entryPath, "%s/%s", root, entry_name);
+
+ dyn_message_type *msgType = NULL;
+ if (descriptorType == FIT_DESCRIPTOR) {
+ msgType =
pubsub_serializationProvider_parseDfiDescriptor(provider, stream, entryPath);
+ } else if (descriptorType == FIT_AVPR) {
+ L_DEBUG("Ignoring avpr files for now, needs fixing!");
+ //msgType =
pubsub_serializationProvider_parseAvprDescriptor(provider, stream, entry_name,
/*TODO FQN*/fqn);
+ } else {
+ L_ERROR("Unexpected descriptor type for entry %s.", entryPath);
+ }
+
+ if (msgType == NULL) {
+ free(entryPath);
+ fclose(stream);
+ continue;
+ }
+
+ fseek(stream, 0L, SEEK_END);
+ long streamSize = ftell(stream);
+ char *membuf = malloc(streamSize + 1);
+ rewind(stream);
+ fread(membuf, streamSize, 1, stream);
+ fclose(stream);
+ membuf[streamSize] = '\0';
+
+
+ celix_version_t *msgVersion = NULL;
+ char *msgFqn = NULL;
+ dynMessage_getVersion(msgType, &msgVersion);
+ dynMessage_getName(msgType, &msgFqn);
+ unsigned int msgId = pubsub_serializationProvider_getMsgId(provider,
msgType);
+
+ pubsub_serialization_entry_t* serEntry = calloc(1, sizeof(*serEntry));
+ serEntry->log = provider->log;
+ serEntry->descriptorContent = membuf;
+ serEntry->msgFqn = msgFqn;
+ serEntry->msgVersion = msgVersion;
+ serEntry->msgVersionStr = celix_version_toString(msgVersion);
+ serEntry->msgId = msgId;
+ serEntry->msgType = msgType;
+ serEntry->readFromBndId = bndId;
+ serEntry->readFromEntryPath = entryPath;
+ serEntry->nrOfTimesRead = 1;
+ serEntry->valid = true;
+ serEntry->invalidReason = "";
+ serEntry->svc.handle = serEntry;
+ serEntry->svc.serialize = (void*)provider->serialize;
+ serEntry->svc.freeSerializedMsg = (void*)provider->freeSerializeMsg;
+ serEntry->svc.deserialize = (void*)provider->deserialize;
+ serEntry->svc.freeDeserializedMsg =
(void*)provider->freeDeserializeMsg;
+ serEntry->svcId = -1L;
+
+
+ bool unique =
pubsub_serializationProvider_isUniqueAndCheckValid(provider, serEntry);
+ if (unique && serEntry->valid) { //note only register if unique and
valid
+ L_DEBUG("Adding message serialization entry for msg %s with id %d
and version %s", serEntry->msgFqn, serEntry->msgId, serEntry->msgVersion);
+ pubsub_serializationProvider_registerSerializationEntry(provider,
serEntry);
+ }
+
+ celixThreadMutex_lock(&provider->mutex);
+ if (unique || !serEntry->valid) { //add all unique entries and !
invalid entries. The invalid entries are added to support debugging.
+ celix_arrayList_add(provider->serializationSvcEntries, serEntry);
+ } else {
+ free(serEntry->descriptorContent);
+ free(serEntry->readFromEntryPath);
+ free(serEntry->msgVersionStr);
+ dynMessage_destroy(serEntry->msgType);
+ free(serEntry);
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+ }
+
+ if (dir) {
+ closedir(dir);
+ }
+}
+
+static void
pubsub_serializationProvider_printEntryDetails(pubsub_serialization_provider_t*
provider, FILE *outStream, pubsub_serialization_entry_t *entry) {
+ char *bndName = celix_bundleContext_getBundleSymbolicName(provider->ctx,
entry->readFromBndId);
+ fprintf(outStream, "|- %20s = %s\n", "msg fqn", entry->msgFqn);
+ fprintf(outStream, "|- %20s = %d (0x8%x)\n", "msg id", entry->msgId,
entry->msgId);
+ fprintf(outStream, "|- %20s = %s\n", "msg vesion", entry->msgVersionStr);
+ fprintf(outStream, "|- %20s = %li\n", "svc id", entry->svcId);
+ fprintf(outStream, "|- %20s = %s (bundle id %li)\n", "read from bundle",
bndName, entry->readFromBndId);
+ fprintf(outStream, "|- %20s = %s\n", "bundle entry name",
entry->readFromEntryPath);
+ fprintf(outStream, "|- %20s = %lu\n", "nr of times found",
entry->nrOfTimesRead);
+ fprintf(outStream, "|- %20s = %s\n", "valid", entry->valid ? "true" :
"false");
+ if (!entry->valid) {
+ fprintf(outStream, "|- %20s = %s\n", "invalid reason",
entry->invalidReason);
+ }
+ fprintf(outStream, "|- %20s:\n", "descriptor");
+ fprintf(outStream, "%s\n", entry->descriptorContent);
+ free(bndName);
+}
+
+bool pubsub_serializationProvider_executeCommand(void *handle, const char
*commandLine , FILE *outStream, FILE *errorStream __attribute__((unused))) {
+ pubsub_serialization_provider_t* provider = handle;
+
+ bool verbose = false;
+ bool invalids = false;
+ unsigned int msgId = 0;
+ const char *msgFqn = NULL;
+
+ //parse command line
+ char *line = celix_utils_strdup(commandLine);
+ char *lasts = NULL;
+ // skip first argument since this is the command
+ strtok_r(line," ", &lasts);
+ char* tok = strtok_r(NULL, " ", &lasts);
+
+ if (tok != NULL && strncmp("invalids", tok, 32) == 0) {
+ invalids = true;
+ } else if (tok != NULL && strncmp("verbose", tok, 32) == 0) {
+ verbose = true;
+ } else if (tok != NULL) {
+ errno = 0;
+ msgId = strtol(tok, NULL, 10);
+ if (errno == EINVAL) {
+ msgId = 0;
+ msgFqn = tok;
+ }
+ }
+
+ if (msgId != 0 || msgFqn != NULL) {
+ celixThreadMutex_lock(&provider->mutex);
+ bool match = false;
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (msgId != 0 && msgId == entry->msgId) {
+ match = true;
+ } else if (msgFqn != NULL && strncmp(msgFqn, entry->msgFqn,
1024*1024) == 0) {
+ match = true;
+ }
+ if (match) {
+ fprintf(outStream, "%s message serialization service info:\n",
provider->serializationType);
+ pubsub_serializationProvider_printEntryDetails(provider,
outStream, entry);
+ break;
+ }
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+ } else {
+ celixThreadMutex_lock(&provider->mutex);
+ if (celix_arrayList_size(provider->serializationSvcEntries) == 0) {
+ fprintf(outStream, "No %s message serialization services
available.\n", provider->serializationType);
+ } else if (invalids) {
+ fprintf(outStream, "%s invalid message serialization services:
\n", provider->serializationType);
+ size_t count = 0;
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (!entry->valid) {
+ fprintf(outStream, "|- entry nr : %d\n", i + 1);
+ pubsub_serializationProvider_printEntryDetails(provider,
outStream, entry);
+ fprintf(outStream, "\n");
+ count++;
+ }
+ }
+ if (count == 0) {
+ fprintf(outStream, "No invalid message serialization entries
found!\n");
+ }
+ } else {
+ fprintf(outStream, "%s message serialization services: \n",
provider->serializationType);
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (verbose) {
+ fprintf(outStream, "|- entry nr : %d\n", i + 1);
+ pubsub_serializationProvider_printEntryDetails(provider,
outStream, entry);
+ fprintf(outStream, "\n");
+ } else {
+ fprintf(outStream, "|- entry nr %02d: msg id=%d, msg
fqn=%s, msg version = %s, valid = %s\n", i + 1, entry->msgId, entry->msgFqn,
entry->msgVersionStr, entry->valid ? "true" : "false");
+ }
+ }
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+ }
+
+ free(line);
+ return true;
+}
+
+void pubsub_serializationProvider_onInstalledBundle(void *handle, const
celix_bundle_t *bnd) {
+ pubsub_serialization_provider_t* provider = handle;
+ char *descriptorsDir = pubsub_getMessageDescriptorsDir(provider->ctx, bnd);
+ if (descriptorsDir != NULL) {
+ pubsub_serializationProvider_parseDescriptors(provider,
descriptorsDir, celix_bundle_getId(bnd));
+ free(descriptorsDir);
+ }
+}
+
+pubsub_serialization_provider_t *pubsub_serializationProvider_create(
+ celix_bundle_context_t *ctx,
+ const char* serializationType,
+ celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const
void* msg, struct iovec** output, size_t* outputIovLen),
+ void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct
iovec* input, size_t inputIovLen),
+ celix_status_t (*deserialize)(pubsub_serialization_entry_t* entry,
const struct iovec* input, size_t inputIovLen __attribute__((unused)), void
**out),
+ void (*freeDeserializeMsg)(pubsub_serialization_entry_t* entry, void
*msg)) {
+ pubsub_serialization_provider_t* provider = calloc(1, sizeof(*provider));
+ provider->ctx = ctx;
+ celixThreadMutex_create(&provider->mutex, NULL);
+ provider->serializationSvcEntries = celix_arrayList_create();
+
+ provider->serializationType = celix_utils_strdup(serializationType);
+ provider->serialize = serialize;
+ provider->freeSerializeMsg = freeSerializeMsg;
+ provider->deserialize = deserialize;
+ provider->freeDeserializeMsg = freeDeserializeMsg;
+
+
+ {
+ char *logName = NULL;
+ asprintf(&logName, "pubsub_serialization_provider(%s)",
serializationType);
+ provider->log = logHelper_createWithName(ctx, logName);
+ free(logName);
+ }
+
+ dynFunction_logSetup(dfi_log, provider, 1);
+ dynType_logSetup(dfi_log, provider, 1);
+ dynCommon_logSetup(dfi_log, provider, 1);
+
+ {
+ celix_bundle_tracking_options_t opts =
CELIX_EMPTY_BUNDLE_TRACKING_OPTIONS;
+ opts.callbackHandle = provider;
+ opts.onInstalled = pubsub_serializationProvider_onInstalledBundle;
+ opts.includeFrameworkBundle = true;
+ provider->bundleTrackerId =
celix_bundleContext_trackBundlesWithOptions(ctx, &opts);
+ }
+
+ {
+ celix_properties_t* props = celix_properties_create();
+ provider->markerSvc.handle = provider;
+ celix_properties_set(props,
PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY,
provider->serializationType);
+ celix_service_registration_options_t opts =
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = &provider->markerSvc;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
+ opts.properties = props;
+ provider->serializationMarkerSvcId =
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+ }
+
+ {
+ provider->cmdSvc.handle = provider;
+ provider->cmdSvc.executeCommand =
pubsub_serializationProvider_executeCommand;
+
+ char *name = NULL;
+ asprintf(&name,"celix::%s_message_serialization",
provider->serializationType);
+ char *usage = NULL;
+ //TODO add support for listing invalid entries
+ asprintf(&usage,"celix::%s_message_serialization [verbose | invalids |
<msg id> | <msg fqn>]", provider->serializationType);
+
+ celix_properties_t* props = celix_properties_create();
+ provider->cmdSvc.handle = provider;
+ celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, name);
+ celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, usage);
+ celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "list
available json message serialization services or provide detailed information
about a serialization service.");
+ celix_service_registration_options_t opts =
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = &provider->cmdSvc;
+ opts.serviceName = CELIX_SHELL_COMMAND_SERVICE_NAME;
+ opts.serviceVersion = CELIX_SHELL_COMMAND_SERVICE_VERSION;
+ opts.properties = props;
+ provider->cmdSvcId =
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+
+ free(name);
+ free(usage);
+ }
+ return provider;
+}
+
+void pubsub_serializationProvider_destroy(pubsub_serialization_provider_t*
provider) {
+ if (provider != NULL) {
+ celix_bundleContext_stopTracker(provider->ctx,
provider->bundleTrackerId);
+ celix_bundleContext_unregisterService(provider->ctx,
provider->serializationMarkerSvcId);
+ celix_bundleContext_unregisterService(provider->ctx,
provider->cmdSvcId);
+
+ celixThreadMutex_lock(&provider->mutex);
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ celix_bundleContext_unregisterService(provider->ctx, entry->svcId);
+ free(entry->descriptorContent);
+ free(entry->readFromEntryPath);
+ free(entry->msgVersionStr);
+ dynMessage_destroy(entry->msgType);
+ free(entry);
+ }
+ celix_arrayList_destroy(provider->serializationSvcEntries);
+ celixThreadMutex_unlock(&provider->mutex);
+
+ celixThreadMutex_destroy(&provider->mutex);
+
+ logHelper_destroy(&provider->log);
+
+ free(provider->serializationType);
+ free(provider);
+ }
+}
+
+size_t
pubsub_serializationProvider_nrOfEntries(pubsub_serialization_provider_t*
provider) {
+ size_t count = 0;
+ celixThreadMutex_lock(&provider->mutex);
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (entry->valid) {
+ ++count;
+ }
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+ return count;
+}
+
+size_t
pubsub_serializationProvider_nrOfInvalidEntries(pubsub_serialization_provider_t*
provider) {
+ size_t count = 0;
+ celixThreadMutex_lock(&provider->mutex);
+ for (int i = 0; i <
celix_arrayList_size(provider->serializationSvcEntries); ++i) {
+ pubsub_serialization_entry_t *entry =
celix_arrayList_get(provider->serializationSvcEntries, i);
+ if (!entry->valid) {
+ ++count;
+ }
+ }
+ celixThreadMutex_unlock(&provider->mutex);
+ return count;
+}
+
+log_helper_t*
pubsub_serializationProvider_getLogHelper(pubsub_serialization_provider_t
*provider) {
+ return provider->log;
+}
\ No newline at end of file