[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13409534#comment-13409534
 ] 

Sijie Guo commented on BOOKKEEPER-321:
--------------------------------------

Message Filter Support in Hedwig:

1) providing a message header *MessageHeader* in *Message* : (BOOKKEEPER-78}

system properties are defined as fields in protobuf definition.
user properties are defined as a key/bytes map structure. applications could 
put their customized data in it.

{code}
+// common structure to store header or properties
+message Map {
+    message Entry {
+        optional string key  = 1;
+        optional bytes value = 2;
+    }
+    repeated Entry entries = 1;
+}
+
+// message header
+message MessageHeader {
+    // user customized fields used for message filter
+    optional Map properties = 1;
+    // following are system properties in message header
+    optional string messageType = 2;
+}
@@ -30,6 +47,8 @@ message Message {
     required bytes body = 1;
     optional bytes srcRegion = 2;
     optional MessageSeqId msgId = 3;
+    // message header
+    optional MessageHeader header = 4;
 }
{code}

2) add preferences for a subscription. (BOOKKEEPER-332)

*SubscriptionPreferences* is a structure to store all preferences for a 
subscription. The preferences could be passed as *SubscriptionOptions*, sent to 
hub server and stored in meta store.

system options are defined as fields in *SubscriptionPreferences* in protobuf 
definition. currently we had two system options, *messageBound* is used to 
limit number of messages to receive for a subscription,  *messageFilter* is 
used to support running server-side message filter.

user options are defined as key/bytes map structure. application could put 
their customized options in it.

{code}
+// record all preferences for a subscription,
+// would be serialized to be stored in meta store
+message SubscriptionPreferences {
+    // user customized subscription options
+    optional Map options = 1;
+
+    ///
+    /// system defined options
+    ///
+
+    // message bound
+    optional uint32 messageBound = 2;
+    // server-side message filter
+    optional string messageFilter = 3;
+}
+

 message SubscribeRequest{
     required bytes subscriberId = 2;

@@ -100,12 +135,22 @@ message SubscribeRequest{

        // wait for cross-regional subscriptions to be established before 
returning
        optional bool synchronous = 4 [default = false];
+        // @Deprecated. set message bound in SubscriptionPreferences
        optional uint32 messageBound = 5;
+
+        // subscription options
+        optional SubscriptionPreferences preferences = 6;
 }

+// used in client only
+// options are stored in SubscriptionPreferences structure
 message SubscriptionOptions {
     optional SubscribeRequest.CreateOrAttach createOrAttach = 2 [default = 
CREATE_OR_ATTACH];
     optional uint32 messageBound = 3 [default = 0];
+    // user customized subscription options
+    optional Map options = 4;
+    // server-side message filter
+    optional string messageFilter = 5;
 }

 message SubscriptionState {
     required MessageSeqId msgId = 1;
+    // @Deprecated.
+    // It is a bad idea to put fields that don't change frequently
+    // together with fields that change frequently
+    // so move it to subscription info structure
     optional uint32 messageBound = 2;
 }

+message SubscriptionData {
+    optional SubscriptionState state = 1;
+    optional SubscriptionPreferences preferences = 2;
+}
+
{code}

The subscription preferences would be returned as response body in subscribe 
request. The subscription preferences would be passed to message filter running 
in client-side.

{code}
+message SubscribeResponse {
+    optional SubscriptionPreferences preferences = 2;
+}
+
+message ResponseBody {
+    optional SubscribeResponse subscribeResponse = 2;
+}
+
 message PubSubResponse{
     required ProtocolVersion protocolVersion = 1;
     required StatusCode statusCode = 2;
@@ -142,8 +195,10 @@ message PubSubResponse{
     optional Message message = 5;
     optional bytes topic = 6;
     optional bytes subscriberId = 7;
-}

+    // the following fields are sent by other requests
+    optional ResponseBody respBody = 8;
+}
{code}

3) message filter (BOOKKEEPER-333)

MessageFilter is described as below, which extends the original internal 
interface.

initialize/uninitialize is used to initialize/uninitialize a message filter 
running on server-side.

setSubscriptionPreferences passed subscription preferences to the message 
filter when it starts to run on either server side or client side. so message 
filter could get the subscriber's preferences and might use them to do 
filtering.

testMessage is used to answer yes/no giving a message. all the messages return 
true could be delivered to subscriber.

{code}
+public interface MessageFilter {
+
+    /**
+     * Initialize the message filter.
+     *
+     * @param conf
+     *          Configuration Object. An <i>MessageFilter</i> might read 
settings from it.
+     * @return message filter
+     * @throws IOException when failed to initialize message filter
+     */
+    public MessageFilter initialize(Configuration conf)
+    throws ConfigurationException, IOException;
+
+    /**
+     * Uninitialize the message filter.
+     */
+    public void uninitialize();
+
+    /**
+     * Set subscription preferences.
+     *
+     * <code>preferences</code> of the subscriber will be passed to message 
filter when
+     * the message filter attaches to its subscription either in server-side 
or client-side.
+     *
+     * @param topic
+     *          Topic Name.
+     * @param subscriberId
+     *          Subscriber Id.
+     * @param preferences
+     *          Subscription Preferences.
+     * @return message filter
+     */
+    public MessageFilter setSubscriptionPreferences(ByteString topic, 
ByteString subscriberId,
+                                                    SubscriptionPreferences 
preferences);
+
+    /**
+     * Tests whether a particular message passes the filter or not
+     *
+     * @param message
+     * @return
+     */
+    public boolean testMessage(Message message);
+}
{code}

4) server-side filter (BOOKKEEPER-333)

subscriber sets the message filter class name in its subscription preferences 
when subscribe. hub server would instantiate the message filter for the 
subscriber when starting serving subscribe requests.

its subscription preferences would be passed to the message filter and all 
messages tried to be delivered to it would do #testMessage first. if the 
message returns true, it enters deliver progress; otherwise, hub server would 
deliver next message.

5) client-side filter (BOOKKEEPER-334 & BOOKKEEPER-335)

A message filter is attached when #startDelivery to receive messages. all 
messages would be delivered to messageHandler only they passed testing of the 
message filter. their subscription preferences are passed to the message filter 
when calling #startDelivery.

java-client:
{code}

     /**
+     * Begin delivery of messages from the server to us for this topic and
+     * subscriberId.
+     *
+     * Only the messages passed <code>messageFilter</code> could be delivered 
to
+     * <code>messageHandler</code>.
+     *
+     * @param topic
+     *            Topic name of the subscription
+     * @param subscriberId
+     *            ID of the subscriber
+     * @param messageHandler
+     *            Message Handler that will consume the subscribed messages
+     * @throws ClientNotSubscribedException
+     *             If the client is not currently subscribed to the topic
+     * @throws AlreadyStartDeliveryException
+     *             If someone started delivery a message handler before 
stopping existed one.
+     */
+    public void startDelivery(ByteString topic, ByteString subscriberId,
+                              MessageHandler messageHandler, MessageFilter 
messageFilter)
+            throws ClientNotSubscribedException, AlreadyStartDeliveryException;
+
{code}

cpp client:
{code}
+
+  typedef std::tr1::shared_ptr<SubscriptionPreferences> 
SubscriptionPreferencesPtr;
+
+  class MessageFilter {
+  public:
+    virtual void setSubscriptionPreferences(const std::string& topic, const 
std::string& subscriberId,
+                                            const SubscriptionPreferencesPtr& 
preferences) = 0;
+    virtual bool testMessage(const Message& message) = 0;
+
+    virtual ~MessageFilter() {};
+  };
+  typedef std::tr1::shared_ptr<MessageFilter> MessageFilterPtr;


+    virtual void startDelivery(const std::string& topic, const std::string& 
subscriberId,
+                               const MessageHandlerCallbackPtr& callback,
+                               const MessageFilterPtr& filter) = 0;
+
{code}


6) giving customized properties in MessageHeader and SubscriptionPreferences. 
different application could  leverage them to implement their own filtering 
logic.
                
> Message Filter Support
> ----------------------
>
>                 Key: BOOKKEEPER-321
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-321
>             Project: Bookkeeper
>          Issue Type: Improvement
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>             Fix For: 4.2.0
>
>
> Support message filtering in hedwig.
> 1) add user-customized headers part in Message, which could be used for 
> message filtering.
> 2) add user-customized subscription preferences, which could be used for 
> message filtering.
> 3) support both server-side & client-side message filter.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to