2018-03-26 17:14:38 UTC - Venkat: Hi Guys , I was going through the pulsar docs and trying to understand the system. I'm curious to know more about what pulsar use's local zookeeper and global zookeeper for. I couldn't find much info in the docs. Will be really helpful if you guys can share few pointers... ---- 2018-03-26 17:21:05 UTC - Matteo Merli: Hi @Venkat. “Local ZooKeeper” is the regular ZK ensemble. It’s used both by Pulsar and BookKeeper for coordination and metadata.
“Global ZooKeeper” is referring to the configuration store. When running multiple clusters in different regions, with geo-replication, we want to ensure the configuration is consistent across all regions. For that we use a ZK quorum with presence in multiple data-centers, that it’s used as a ways to store configuration (namespaces policies, acls… ) and notify brokers when configuration changes. It’s not strictly necessary though. You can avoid the global ZK if you’re willing to manually maintain the configuration consistency (eg: write config changes in each region). ---- 2018-03-26 17:26:43 UTC - Venkat: Got a better picture now.. Thanks a lot @Matteo Merli ---- 2018-03-26 17:27:32 UTC - Matteo Merli: We’re actually in the process of renaming the global zk into configuration store, to avoid the confusion ---- 2018-03-26 17:28:24 UTC - Venkat: Yeah that would be better ---- 2018-03-27 00:20:12 UTC - Igor Zubchenok: Hello! I believe I found a bug or two bugs. I've prepared a short code that demo my issues. How should I proceed to share it? ---- 2018-03-27 00:21:04 UTC - Sijie Guo: @Igor Zubchenok you can send a pull request to pulsar github repo. ---- 2018-03-27 00:21:45 UTC - Igor Zubchenok: I don't know how to fix it))) I've just reproduced it in synthetic test ---- 2018-03-27 00:22:32 UTC - Sijie Guo: oh i see ---- 2018-03-27 00:22:57 UTC - Sijie Guo: you can push your test code to a branch, and file a github issue and attach your branch link? ---- 2018-03-27 00:23:00 UTC - Igor Zubchenok: there are two notes: I don't get messages that have been published I get reachedEndOfTopic twice ---- 2018-03-27 00:23:17 UTC - Sijie Guo: i see. ---- 2018-03-27 00:24:23 UTC - Igor Zubchenok: @Igor Zubchenok shared a file: <https://apache-pulsar.slack.com/files/U9T9CC2P2/F9XGLD1V5/gistfile1.txt|gistfile1.txt> ---- 2018-03-27 00:25:37 UTC - Igor Zubchenok: suspicious here that I get "reachedEndOfTopic" twice and and don't get the sent message received. ---- 2018-03-27 00:26:40 UTC - Matteo Merli: @Igor Zubchenok The subscription needs to be there before publishing the messages ---- 2018-03-27 00:29:23 UTC - Igor Zubchenok: ... ---- 2018-03-27 00:29:51 UTC - Igor Zubchenok: This is not obvious. ---- 2018-03-27 00:30:20 UTC - Matteo Merli: The subscription needs to be created pointing to a specific position ---- 2018-03-27 00:30:45 UTC - Matteo Merli: the default is to point to current last message (meaning you’ll receive the next) ---- 2018-03-27 00:31:13 UTC - Matteo Merli: in current master there is the option to create a subscription on the earliest message avaialble ---- 2018-03-27 00:32:28 UTC - Igor Zubchenok: this makes sense. ---- 2018-03-27 00:32:45 UTC - Igor Zubchenok: but why I get reachedEndOfTopic twice? ---- 2018-03-27 00:33:36 UTC - Matteo Merli: good question, I agree the expectation is that it should be called once here ---- 2018-03-27 00:34:10 UTC - Igor Zubchenok: so you can check I cannot debug Pulsar yet, cause I have problems to attach the source code in Idea. ---- 2018-03-27 00:34:40 UTC - Matteo Merli: if you subscribe before publishing, does it still get the notification twice? ---- 2018-03-27 00:34:56 UTC - Igor Zubchenok: ... a moment... ---- 2018-03-27 00:36:07 UTC - Igor Zubchenok: hmm, I don't get any reachedEndOfTopic at all ---- 2018-03-27 00:38:01 UTC - Matteo Merli: <https://github.com/apache/incubator-pulsar/blob/052814d516adb429d2c0fca030920f26770a52ac/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java#L213> ---- 2018-03-27 00:38:08 UTC - Matteo Merli: this the unit test for that ---- 2018-03-27 00:38:52 UTC - Matteo Merli: it’s mostly the same and the test has been always succeeding ---- 2018-03-27 00:49:54 UTC - Igor Zubchenok: Seems the unit test is not for the latest available version in maven - 1.22.0-incubating ---- 2018-03-27 00:50:13 UTC - Matteo Merli: Yes, let me get the 1.22 version of it ---- 2018-03-27 00:50:31 UTC - Matteo Merli: <https://github.com/apache/incubator-pulsar/blob/v1.22.0-incubating/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java#L215> ---- 2018-03-27 01:02:08 UTC - Igor Zubchenok: your test passes, but I get WARN and ERROR after line ` MessageId lastMessageId = admin.persistentTopics().terminateTopicAsync(topicName).get();` ---- 2018-03-27 01:04:09 UTC - Matteo Merli: What errors? I don’t get any when running that test locally ---- 2018-03-27 01:04:40 UTC - Matteo Merli: @Matteo Merli shared a file: <https://apache-pulsar.slack.com/files/U680ZCXA5/F9XGY7BJT/gistfile1.txt|gistfile1.txt> ---- 2018-03-27 01:06:07 UTC - Igor Zubchenok: There is something happens after deletion of the topic ---- 2018-03-27 01:06:32 UTC - Igor Zubchenok: I actually delete the topic before test start. ---- 2018-03-27 01:14:18 UTC - Igor Zubchenok: ... give me some time, I'm trying to eliminate the differences between tests ---- 2018-03-27 01:14:31 UTC - Matteo Merli: Sure ---- 2018-03-27 01:25:34 UTC - Igor Zubchenok: Ok, the reason that you call acknowledgeCumulative, but I don't. So I don't get reachedEndOfTopic. Is it designed like this? ---- 2018-03-27 01:26:39 UTC - Matteo Merli: Though you’re acknowledging all the messages, individually, right? ---- 2018-03-27 01:27:58 UTC - Igor Zubchenok: In the test I don't acknowledge anything. Just reading messages (in my code I reading everything, pack it in a single batch and sent to client) ---- 2018-03-27 01:28:34 UTC - Matteo Merli: I see, then for the consumer point of view the topic is not “ended” yet ---- 2018-03-27 01:28:49 UTC - Matteo Merli: it will get the notification when all messages have been consumed ---- 2018-03-27 01:28:51 UTC - Igor Zubchenok: so topic is ended when all messages are acked? ---- 2018-03-27 01:29:03 UTC - Matteo Merli: producer gets notification immediately ---- 2018-03-27 01:29:18 UTC - Matteo Merli: consumer when it has caught up ---- 2018-03-27 01:33:31 UTC - Igor Zubchenok: I see. Ok, When I added acknowledgeCumulative, I get reachedEndOfTopic properly. (However I still not sure if it is good to have all messages acknowledged before getting reachedEndOfTopic) ---- 2018-03-27 01:34:58 UTC - Matteo Merli: The point is that this enables to coordinate producers and consumers to switch to new topics, while maintaining ordering and without loosing data ---- 2018-03-27 01:38:30 UTC - Igor Zubchenok: My reason for such expectation is that I use two topics: one for session init, second for session updates. As soon as I send all 'init', I start sending 'updates' immediately without acknowledging of the 'init' messages. That's why I expected such behaviour. ---- 2018-03-27 01:39:21 UTC - Matteo Merli: You can always do that by having different types of messages ---- 2018-03-27 01:39:31 UTC - Matteo Merli: (or attaching properties to your messages) ---- 2018-03-27 01:39:33 UTC - Igor Zubchenok: Do you have types of messages? ---- 2018-03-27 01:39:38 UTC - Igor Zubchenok: Wow. ---- 2018-03-27 01:39:59 UTC - Matteo Merli: you can attach a `Map<String, String>` of application defined properties ---- 2018-03-27 01:40:14 UTC - Igor Zubchenok: And how can I use it? ---- 2018-03-27 01:40:34 UTC - Matteo Merli: `MessageBuilder.setProperty(key, value)` ---- 2018-03-27 01:40:58 UTC - Igor Zubchenok: Sorry, I saw that. But how can I actually use it? ---- 2018-03-27 01:41:48 UTC - Matteo Merli: Oh, you can mark that this message is the last one, so consumer will know it ---- 2018-03-27 01:42:00 UTC - Igor Zubchenok: I got. ---- 2018-03-27 01:42:50 UTC - Igor Zubchenok: in the beginning I published an empty message (new byte[0]) before realised there is a 'reachedEndOfTopic' callback ---- 2018-03-27 01:44:15 UTC - Igor Zubchenok: I did not expect that while reading, I have to acknowledge all messages before getting reachedEndOfTopic. But you understood that already. ---- 2018-03-27 01:45:16 UTC - Igor Zubchenok: Anyway, there is something with doubling reachedEndOfTopic when I subscribe after publisher is closed and topic is terminated. ---- 2018-03-27 01:45:43 UTC - Matteo Merli: yes, can you open an issue for that. I’ll take a look later ---- 2018-03-27 01:54:54 UTC - Igor Zubchenok: I will. Before I proceed, I notice one more thing: I get 'Topic was already terminated' when run this code ``` Producer producer = pulsarClient.createProducer(topicName); producer.send("test".getBytes()); admin.persistentTopics().terminateTopicAsync(topicName).get(); ``` topicName is unique ---- 2018-03-27 01:56:07 UTC - Matteo Merli: In the `get()` call ? ---- 2018-03-27 01:56:10 UTC - Igor Zubchenok: ``` WARN Received error from server: Topic was already terminated ERROR Failed to create producer: Topic was already terminated ``` ---- 2018-03-27 01:56:14 UTC - Igor Zubchenok: yep ---- 2018-03-27 01:56:58 UTC - Matteo Merli: But do you get exception or it’s just the logs? ---- 2018-03-27 01:57:06 UTC - Matteo Merli: I think this is coming from producer logs ---- 2018-03-27 01:58:24 UTC - Matteo Merli: producer will get a notification from broker (CloseProducer). Then producer tries to reconnect and gets the “error” and since this is a permanent condition, it stops trying to reconnect ---- 2018-03-27 01:59:21 UTC - Igor Zubchenok: No exception, but there is 'Closed connection', 'Reconnecting after timeout', 'Creating producer' and only then 'Received error from server: Topic was already terminated' ---- 2018-03-27 02:00:47 UTC - Igor Zubchenok: And you're right, no exception occurred if I close producer before terminating the topic. ---- 2018-03-27 02:15:18 UTC - Igor Zubchenok: 1. here it is: <https://github.com/apache/incubator-pulsar/issues/1452> ---- 2018-03-27 02:40:39 UTC - Igor Zubchenok: 2. One more thing I noted: when I send several messages using a batch, I get the same ID for these messages. ---- 2018-03-27 02:41:15 UTC - Igor Zubchenok: ``` Producer producer = pulsarClient.createProducer(topicName, new ProducerConfiguration() .setBatchingEnabled(true) .setBatchingMaxMessages(1000) .setBatchingMaxPublishDelay(1, TimeUnit.SECONDS) ); producer.sendAsync("A".getBytes()).whenCompleteAsync((messageId, throwable) -> { System.out.println("A: " + messageId); }); producer.sendAsync("B".getBytes()).whenCompleteAsync((messageId, throwable) -> { System.out.println("B: " + messageId); }); ``` output: ``` A: 1248:0:-1:0 B: 1248:0:-1:0 ``` ---- 2018-03-27 02:47:17 UTC - Igor Zubchenok: While I read, I get another IDs: ```B: 1253:0:-1:0 A: 1253:0:-1:0 Read: 1253:0:-1:0 Read: 1253:0:-1:1``` Is it ok? ---- 2018-03-27 02:54:56 UTC - Igor Zubchenok: Oops, sorry I used wrong slack channel today. ---- 2018-03-27 03:01:04 UTC - Igor Zubchenok: 3. If I use a subscription and then user reconnects to another server instance, how can I continue using the the same user's subscription and cursor afterwards? ---- 2018-03-27 03:16:57 UTC - Igor Zubchenok: I should just use the same name subscription name, right? ---- 2018-03-27 03:26:57 UTC - Matteo Merli: Yes, exactly, it will resume from the last unackednmessage +1 : Igor Zubchenok ---- 2018-03-27 03:34:15 UTC - Igor Zubchenok: 4. You know, there is a lack of admin.persistentTopics().terminateTopic() method. I guess it should be there for completeness. ----