[
https://issues.apache.org/jira/browse/BOOKKEEPER-77?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13135850#comment-13135850
]
Sijie Guo commented on BOOKKEEPER-77:
-------------------------------------
implementation of hedwig console:
* pub
** use hedwigclient.getPublisher().publish(...)
* sub
** use hedwigclient.getSubscriber().subscribe()
* closesub
** use hedwigclient.getSubscriber().closeSubscription()
* unsub
** use hedwigclient.getSubscriber().unsubscribe()
* consume/consumeto
** use hedwigclient.getSubscriber().consume()
* pubsub
*# sub <topic> as subscriber <subscriber_id_prefix>_<cur_time> .
*# subscriber <subscriber_id_prefix>_<cur_time> will wait a message until
<timeout_secs> secs.
*# publish a message <message_prefix>_<cur_time> to topic <topic> .
*# when subscriber <subscriber_id_prefix> receive the message, it will check
the message is the published message
*# received message or timeout, subscriber <subscriber_id_prefix> will
unsubscribe the <topic>
*# quit
* describe topic
*# check whether the topic existed or not :
zk.exists("/hedwig/<region>/topics/<topic>").
*## if existed, go next
*## else return
*# read the topic owner from zookeeper : /hedwig/<region>/topics/<topic>/owner
*# read the persistent information from zookeeper :
/hedwig/<region>/topics/<topic>/ledgers
*## for each ledger, parse its data to get its start message id and end message
id
*### if the ledger is not closed (which means the ledger is used for persist
messages not), we use bookkeeper client to openLedgerNoRecovery to read the
lastConfirmed message id
*# read the subscription information from zookeeper :
/hedwig/<region>/topics/<topic>/subscribers
*## for each subscriber, parse its data to get the subscription state
* readtopic
*# check whether the topic existed or not :
zk.exists("/hedwig/<region>/topics/<topic>").
*# read the persistent information from zookeeper
*# read the subscription information from zookeeper to get the
<least_consumed_message_id>
*# find <start_message_id> by MAX( <least_consumed_message_id>,
<start_message_id> )
*# loop over the ledgers used for persisting messsages
*## for each ledger, if the messages are all less than <start_message_id>, skip
the ledger
*## otherwise, open ledger by calling openLedgerNoRecovery (just for reading)
to read the messages and format them
* show hubs
* fetch list of hosts from zookeeper /hedwig/<region>/available , for each host
read its data and convert it to integer.
* show topics
** fetch list of topics from zookeeper /hedwig/<region>/topics
> Add a console client for hedwig
> -------------------------------
>
> Key: BOOKKEEPER-77
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-77
> Project: Bookkeeper
> Issue Type: New Feature
> Components: hedwig-client
> Reporter: Sijie Guo
> Attachments: bookkeeper-77-draft.patch
>
>
> implement a console client to use/admin hedwig system.
> Usage : hedwig_console [options] COMMAND [argument ...]
> (if no COMMAND specified, hedwig_console will enter interactive mode.)
> OPTIONS:
> {quote}
> --zkquorums the quorum list of zookeeper cluster
> --zktimeout timeout of zookeeper client
> --zk_hedwig_prefix the prefix of zookeeper path to store hedwig
> metadata
> --region which region of hedwig to connect
> --consume_interval the consume interval of hub server
> {quote}
> COMMANDS:
> * pub <topic> <message>
> ** Publish <message> to the specified <topic>.
> * sub <topic> <subscriber_id> [mode]
> ** Subscribe the specified <topic> as subscriber <subscriber_id>. (NOTE: only
> run in INTERACTIVE mode now)
> ** mode: subscription mode. available values are 0, 1, 2.
> *** 0 = CREATE : create the subscription if not subscription before.
> *** 1 = ATTACH (default) : attach the subscription
> *** 2 = CREATE_OR_ATTACH : if the subscription is not existed, create the
> subscription then attach.
> * closesub <topic> <subscriber_id>
> ** Close subscription of subscriber <subscriber_id>. (NOTE: it just close the
> subscription connection and do cleanup work in client-side, without REMOVING
> subscription state from server side)
> * unsub <topic> <subscriber_id>
> ** Remove subscription state of subscriber <subscriber_id>. the subscription
> state of subscriber <subscriber_id> will be removed from server side.
> * consume <topic> <subscriber_id> <num_messages_to_consume>
> ** Move the subscription ptr of subscriber <subscriber_id> from ptr to ptr +
> num_messages_to_consume.
> * consumeto <topic> <subscriber_id> <message_id>
> ** Move the subscription ptr of subscriber <subscriber_id> from ptr to
> <message_id>.
> ** NOTE: consume*/*consumeto just sent consume request to hub server and hub
> server move the subscription ptr in its memory. Hub server lazily persists
> the subscription ptr to zookeeper. the default persist interval in hub server
> is 50 messages. so use DESCRIBE TOPIC to show subscription, the subscription
> ptr might be not changed.
> * pubsub <topic> <subscriber_id_prefix> <timeout_secs> <message_prefix>
> ** A test command to test healthy of hedwig cluster.
> *# sub <topic> as subscriber <subscriber_id_prefix>_<cur_time> .
> *# subscriber <subscriber_id_prefix>_<cur_time> will wait a message until
> <timeout_secs> secs.
> *# publish a message <message_prefix>_<cur_time> to topic <topic> .
> *# when subscriber <subscriber_id_prefix> receive the message, it will check
> the message is the published message
> *# received message or timeout, subscriber <subscriber_id_prefix> will
> unsubscribe the <topic>
> *# quit
> {quote}
> [hedwig: (standalone) 7] pubsub ttttttttt test 10 test_message
> Starting PUBSUB test ...
> Sub topic ttttttttt, subscriber id test-1319602021044
> Pub topic ttttttttt : test_message-1319602021044
> Received message : test_message-1319602021044
> PUBSUB SUCCESS. TIME: 43 MS
> SUCCESS. Finished 0.058 s
> {quote}
> * show hubs
> ** list all available hub servers. including hostname and how many topics the
> server owns.
> {quote}
> Example:
> Available Hub Servers:
> 98.137.99.27:9875:9876 : 2
> {quote}
>
> * show topics
> ** list all existing topics. (NOTE: since we fetch topic lists from
> zookeeper, we may got PacketLenException when we have millions of topics. it
> doesn't affect system, just can't display the topic list)
> * describe topic <topic>
> ** show state of a specified topic, including topic owner, topic persistent
> information, topic subscriber list and their subscription states.
> {quote}
> Example:
> ===== Topic Information : ttttt =====
> Owner : 98.137.99.27:9875:9876
> >>> Persistence Info <<<
> Ledger 54729 [ 1 ~ 59 ]
> Ledger 54731 [ 60 ~ 60 ]
> >>> Subscription Info <<<
> Subscriber mysub : consumeSeqId: local:50
> {quote}
>
> * readtopic <topic> [start_msg_id]
> ** read messages of a specified <topic>.
> *** no <start_msg_id> specified : readtopic will start from
> <least_consumed_message_id> + 1 of its subscribers. in above exmaple,
> "readtopic ttttt" will start from 50. if there is no subscription, it will
> start from 1.
> *** <start_msg_id> specified : since messages consumed will be removed by
> garbage collection. so readtopic tries to not read consumed message, it will
> start from MAX( <start_msg_id> , <least_consumed_message_id> ).
> *** Message Format
> **** MsgId : include two parts: first part is which region the message is
> published from, second part is message id.
> **** SrcRegion : region name
> **** Message : the message body
> {quote}
> ---------- MSGID=LOCAL(51) ----------
> MsgId: LOCAL(51)
> SrcRegion: standalone
> Message:
> hello
> {quote}
> * history
> ** list history commands
> * redo [<cmdno>|!]
> ** redo the specified command by command no. (NOTE: "*redo *" means redo the
> previous command)
> * help
> ** print help information
> * quit|exit
> ** exit the interactive console
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira