xuzhenbao commented on issue #515: URL: https://github.com/apache/celix/issues/515#issuecomment-2008626293
To remote event admin, I have the following ideas. First of all, the remote distribution provider should be pluggable, and its interface definition is as follows: ~~~ typedef struct celix_event_remote_provider_service { void *handle; //It is used to distribute the asynchronous event celix_status_t (*postEvent)(const char *topic, const celix_properties_t *properties); //It is used to distribute the synchronous event celix_status_t (*sendEvent)(const char *topic, const celix_properties_t *properties); }celix_event_remote_provider_service_t; ~~~ The service includes properties: `celix.event.subscriber.endpoint.framework.uuids, service.ranking`, and event admin selects the best `celix_event_remote_provider_service_t` service to distribute events to remote based on these properties. The selection strategy is as follows: - If `celix.event.subscriber.endpoint.framework.uuids` are the same, then select the one with the highest `service.ranking`. - If `celix.event.subscriber.endpoint.framework.uuids` and `service.ranking` are the same, then select the one that was registered first (its `service.id` is smallest). - If the set of celix.event.subscriber.endpoint.framework.uuids is a containment relationship, then select the container. - If the set of celix.event.subscriber.endpoint.framework.uuids is an intersection relationship, then the event property $celix.event.source.framework.uuid (used to identify the event source framework) and $celix.event.source.seq (used to identify the event sequence) should be set, and they are used for event deduplication. In addition, the event properties retain the property name with the "$" prefix. These properties are used to represent the specific information of the remote distribution provider or control the API behavior. The properties that control the API behavior include: - `$celix.event.remote.qos` - QOS behavior definition: consistent with MQTT. In addition, if the remote distribution provider does not support the corresponding QOS level, it can choose to degrade and record a log. - QOS scope: Both the published event and the subscriber can set this property. If there is a conflict between the two, then the smaller value is taken. - `$celix.event.remote.expiryInterval`: event expiration time, expired events are no longer delivered to subscribers. If this value is not set, it means never expire. - `$celix.event.remote.enable`: If the event contains the property, the event will be forwarded to the remote distribution provider. If this property is not set, it means that the event is only published locally. Note: The above event properties only take effect for remote events. In addition, I don't intend to forward framework events to remote currently, because I am not sure about the meaning of forwarding framework events to remote. If it needs to be forwarded to remote, it should add the property "org.osgi.framework.uuid". I plan to implement the remote distribution provider in two ways, one is based on RSA, and the other is based on MQTT. For the implementation based on MQTT, I plan to use the mosquitto library to implement it. **The implementation based on RSA** For the implementation based on RSA, it will add the remote interface `celix_event_handler_remote_listener_service_t` and `celix_event_remote_subscriber_service_t`. `celix_event_handler_remote_listener_service_t` is used to listen to remote subscription information, and `celix_event_remote_subscriber_service_t` is used to forward event to remote subscribers. For example, suppose there are two processes A and B in the system, process B needs to subscribe to event E1, and event E1 is published by process A, then process A needs to provide `c` service, and process B needs to provide `celix_event_remote_subscriber_service_t` service. Process B forwards its subscription information to A by calling the `celix_event_handler_remote_listener_service_t` service of process A, and process A forwards E1 to process B by calling the `celix_event_remote_subscriber_service_t` service of process B. Both `celix_event_handler_remote_listener_service_t` and `celix_event_remote_subscriber_service_t` ar e called remotely. **Note**:I don't consider attaching the subscription information of process B to the `celix_event_remote_subscriber_service_t` service properties, because if this is done, when updating the subscription information, process B needs to re-register the `celix_event_remote_subscriber_service_t` service, during this period, process B may lose some events published by process A. Therefore, process B forwards its subscription information to A through the `celix_event_handler_remote_listener_service_t` of process A. The definition of `celix_event_handler_remote_listener_service_t` and `celix_event_remote_subscriber_service_t` is as follows: ~~~ typedef struct celix_event_handler_remote_listener_service { void *handle; celix_status_t (*handlerAdded)(void* handle, const char* handlerFwUUID, long handlerSvcId, const char* topics, const char* filter, int qos); celix_status_t (*handlerRemoved)(void* handle, const char* handlerFwUUID, long handlerSvcId, const char* topics, const char* filter); }celix_event_handler_remote_listener_service_t; typedef struct celix_event_remote_subscriber_service { void* handle; celix_status_t (*receiveEventAsync)(void* handle, const char* topic, const celix_properties_t* properties); celix_status_t (*receiveEventSync)(void* handle, const char* topic, const celix_properties_t* properties); }celix_event_remote_subscriber_service_t; ~~~ The component relationship diagram is as follows:  Some other key points to consider in the implementation: - The `celix_properties_t` type will be added to dfi, which is used for the serialization and deserialization of event properties in remote calls. - The remote `celix_event_remote_subscriber_service_t` offline handling strategy: If the event QOS value is QOS1 or QOS2, it should be delivered to online services, and then wait for offline services to online or events to expire or sessions to expire; QOS0 event are directly discarded. - Backpressure handling: considering that large payload event is not transmitted, we will discard the event when the cached event number reaches a certain value(QOS0 events are discarded first). In addition, the remote distribution provider will implement a circuit breaker mechanism for problematic subscribers (Events will be not delivered for a period of time). **The implementation based on MQTT** For the implementation based on MQTT, I will add the remote interface `celix_mqtt_broker_info_service_t`, which is used to obtain the address information of the MQTT broker. The one of remote distribution provider obtains the address information of the MQTT broker by parsing the mosquitto.conf file, and then registers the remote service `celix_mqtt_broker_info_service`, other remote distribution provider obtains the address information from celix_mqtt_broker_info_service_t. The definition of `celix_mqtt_broker_info_service` is as follows: ~~~ #define CELIX_MQTT_BROKER_INFO_SERVICE_NAME "celix_mqtt_broker_info_service" //The address of the MQTT broker #define CELIX_MQTT_BROKER_ADDRESSES "celix.mqtt.broker.addresses" //The port of the MQTT broker #define CELIX_MQTT_BROKER_PORT "celix.mqtt.broker.port" typedef struct celix_mqtt_broker_info_service { void *handle; }celix_mqtt_broker_info_service_t; ~~~ The component relationship diagram is as follows:  Some other key points to consider in the implementation: - Each provider publishes its own subscription information through the message `celix/mqtt/subscriptions/<frameworkUUID>` to the broker, and obtains remote subscription information by subscribing to the message `celix/mqtt/subscriptions/*`. Then the provider should filter events based on the remote subscription information before publishing the event to the broker. - Network reconnection are automatically handled by the mosquitto library. - Use the request/respond mechanism of MQTT to implement synchronous semantics. - For synchronous events, consider using a circuit breaker mechanism (no longer deliver synchronous events to the corresponding framework for a period of time). For asynchronous events, network exceptions are automatically handled by the mosquitto library. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@celix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org