kgiusti commented on a change in pull request #654: DISPATCH-1532 - 
Reimplemented mobile-address-synchronization as a cor…
URL: https://github.com/apache/qpid-dispatch/pull/654#discussion_r360535435
 
 

 ##########
 File path: src/router_core/modules/mobile_sync/mobile.c
 ##########
 @@ -0,0 +1,860 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "module.h"
+#include "router_core_private.h"
+#include "core_events.h"
+#include "route_control.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/log.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/message.h>
+#include <stdio.h>
+#include <inttypes.h>
+
+#define PROTOCOL_VERSION 1
+static const char *OPCODE     = "opcode";
+static const char *MAR        = "MAR";
+static const char *MAU        = "MAU";
+static const char *ID         = "id";
+static const char *PV         = "pv";
+static const char *AREA       = "area";
+static const char *MOBILE_SEQ = "mobile_seq";
+static const char *HINTS      = "hints";
+static const char *ADD        = "add";
+static const char *DEL        = "del";
+static const char *EXIST      = "exist";
+static const char *HAVE_SEQ   = "have_seq";
+
+//
+// Address.sync_mask bit values
+//
+#define ADDR_SYNC_DELETION_WAS_BLOCKED  0x00000001
+#define ADDR_SYNC_IN_ADD_LIST           0x00000002
+#define ADDR_SYNC_IN_DEL_LIST           0x00000004
+#define ADDR_SYNC_TO_BE_DELETED         0x00000008
+
+#define BIT_SET(M,B)   M |= B
+#define BIT_CLEAR(M,B) M &= ~B
+#define BIT_IS_SET(M,B) (M & B)
+
+typedef struct {
+    qdr_core_t                *core;
+    qdrc_event_subscription_t *event_sub;
+    qdr_core_timer_t          *timer;
+    qdr_subscription_t        *message_sub1;
+    qdr_subscription_t        *message_sub2;
+    qd_log_source_t           *log;
+    uint64_t                   mobile_seq;
+    qdr_address_list_t         added_addrs;
+    qdr_address_list_t         deleted_addrs;
+} qdrm_mobile_sync_t;
+
+
+//================================================================================
+// Helper Functions
+//================================================================================
+
+static qd_address_treatment_t qcm_mobile_sync_default_treatment(qdr_core_t 
*core, int hint) {
+    switch (hint) {
+    case QD_TREATMENT_MULTICAST_FLOOD:
+        return QD_TREATMENT_MULTICAST_FLOOD;
+    case QD_TREATMENT_MULTICAST_ONCE:
+        return QD_TREATMENT_MULTICAST_ONCE;
+    case QD_TREATMENT_ANYCAST_CLOSEST:
+        return QD_TREATMENT_ANYCAST_CLOSEST;
+    case QD_TREATMENT_ANYCAST_BALANCED:
+        return QD_TREATMENT_ANYCAST_BALANCED;
+    case QD_TREATMENT_LINK_BALANCED:
+        return QD_TREATMENT_LINK_BALANCED;
+    case QD_TREATMENT_UNAVAILABLE:
+        return QD_TREATMENT_UNAVAILABLE;
+    default:
+        return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? 
QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment;
+    }
+}
+
+
+static bool qcm_mobile_sync_addr_is_mobile(qdr_address_t *addr)
+{
+    const char *hash_key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
+    return !!strchr("MCDEFH", hash_key[0]);
+}
+
+
+qdr_node_t *qdc_mobile_sync_router_by_id(qdrm_mobile_sync_t *msync, 
qd_parsed_field_t *id_field)
+{
+    qd_iterator_t *id_iter = qd_parse_raw(id_field);
+    qdr_node_t *router = DEQ_HEAD(msync->core->routers);
+    while (!!router) {
+        if (qd_iterator_equal(id_iter, 
qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1))
+            return router;
+        router = DEQ_NEXT(router);
+    }
+
+    return 0;
+}
+
+
+/**
+ * Set the 'block_deletion' flag on the address to ensure it is not deleted 
out from under
+ * our list.  If the flag was already set, make note of that fact so we don't 
clear it later.
+ */
+static void qcm_mobile_sync_address_added_to_list(qdr_address_t *addr)
+{
+    if (addr->block_deletion) {
+        BIT_SET(addr->sync_mask, ADDR_SYNC_DELETION_WAS_BLOCKED);
+    } else {
+        addr->block_deletion = true;
+    }
+}
+
+
+/**
+ * Clear the 'block_deletion' flag on the address if it was set by this module.
+ * Check the address to have it deleted if it is no longer referenced anywhere.
+ */
+static void qcm_mobile_sync_address_removed_from_list(qdr_core_t *core, 
qdr_address_t *addr)
+{
+    if (!BIT_IS_SET(addr->sync_mask, ADDR_SYNC_DELETION_WAS_BLOCKED)) {
+        addr->block_deletion = false;
+        qdr_check_addr_CT(core, addr);
+    } else {
+        BIT_CLEAR(addr->sync_mask, ADDR_SYNC_DELETION_WAS_BLOCKED);
+    }
+}
+
+
+static qd_composed_field_t *qcm_mobile_sync_message_headers(const char 
*address, const char *opcode)
+{
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(field);
+    qd_compose_insert_bool(field, 0); // durable
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+    qd_compose_start_list(field);
+    qd_compose_insert_null(field);            // message-id
+    qd_compose_insert_null(field);            // user-id
+    qd_compose_insert_string(field, address); // to
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+    qd_compose_start_map(field);
+    qd_compose_insert_symbol(field, OPCODE);
+    qd_compose_insert_string(field, opcode);
+    qd_compose_end_map(field);
+
+    return field;
+}
+
+
+static void qcm_mobile_sync_compose_diff_addr_list(qdrm_mobile_sync_t *msync, 
qd_composed_field_t *field, bool is_added)
+{
+    qdr_address_list_t *list = is_added ? &msync->added_addrs : 
&msync->deleted_addrs;
+
+    qd_compose_start_list(field);
+    qdr_address_t *addr = DEQ_HEAD(*list);
+    while (addr) {
+        const char *hash_key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
+        qd_compose_insert_string(field, hash_key);
+        if (is_added) {
 
 Review comment:
   Just a suggestion:  you _could_ move this check out of the address loop by 
setting the list name (SYNC_ADD/DEL) and bit in locals before the loop (like 
you set the list var).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to