Hi Hasitha sorry for the delayed response. I've replied to the questions in line with my understanding of things. @Sajini, HasithaD correct me if I'm wrong.
On Tue, Jul 14, 2015 at 11:23 PM, Hasitha Hiranya <[email protected]> wrote: > Hi Asitha, > > I want to know in high level what happens to slots when a node is killed. > > 1. A new slot coordinator should be elected. Until then other nodes might > connect to that node. > > Trying to connect to the killed node will give an exception until the the proper connection is updated. So there will be no slots assigned and other than the exception there won't be any message distribution issue > > 2015-07-14 17:59:18,207] ERROR {org.wso2.andes.thrift.MBThriftClient} - > Could not connect to the Thrift Server > 10.100.1.146:7611java.net.ConnectException: > Connection refused > org.apache.thrift.transport.TTransportException: > java.net.ConnectException: Connection refused > at org.apache.thrift.transport.TSocket.open(TSocket.java:187) > at > org.wso2.andes.thrift.MBThriftClient.reConnectToServer(MBThriftClient.java:281) > at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:78) > at > org.wso2.andes.kernel.slot.SlotCoordinatorCluster.getSlot(SlotCoordinatorCluster.java:44) > at > org.wso2.andes.kernel.slot.SlotDeliveryWorker.requestSlot(SlotDeliveryWorker.java:301) > at > org.wso2.andes.kernel.slot.SlotDeliveryWorker.run(SlotDeliveryWorker.java:109) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.net.ConnectException: Connection refused > at java.net.PlainSocketImpl.socketConnect(Native Method) > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:579) > at org.apache.thrift.transport.TSocket.open(TSocket.java:182) > ... 8 more > [2015-07-14 17:59:18,210] ERROR > {org.wso2.andes.kernel.slot.SlotDeliveryWorker} - Error occurred while > connecting to the thrift coordinator Coordinator has changed > org.wso2.andes.kernel.slot.ConnectionException: Coordinator has changed > at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:83) > at > org.wso2.andes.kernel.slot.SlotCoordinatorCluster.getSlot(SlotCoordinatorCluster.java:44) > at > org.wso2.andes.kernel.slot.SlotDeliveryWorker.requestSlot(SlotDeliveryWorker.java:301) > at > org.wso2.andes.kernel.slot.SlotDeliveryWorker.run(SlotDeliveryWorker.java:109) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.thrift.transport.TTransportException: > java.net.SocketException: Connection reset > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > at > org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) > at > org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) > at > org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) > at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) > at > org.wso2.andes.thrift.slot.gen.SlotManagementService$Client.recv_getSlotInfo(SlotManagementService.java:101) > at > org.wso2.andes.thrift.slot.gen.SlotManagementService$Client.getSlotInfo(SlotManagementService.java:87) > at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:73) > ... 6 more > Caused by: java.net.SocketException: Connection reset > at java.net.SocketInputStream.read(SocketInputStream.java:196) > at java.net.SocketInputStream.read(SocketInputStream.java:122) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) > at java.io.BufferedInputStream.read(BufferedInputStream.java:334) > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) > ... 14 more > [2015-07-14 17:59:18,211] ERROR {org.wso2.andes.thrift.MBThriftClient} - > Could not initialize the Thrift client. java.net.ConnectException: > Connection refused > org.apache.thrift.transport.TTransportException: > java.net.ConnectException: Connection refused > at org.apache.thrift.transport.TSocket.open(TSocket.java:187) > at > org.wso2.andes.thrift.MBThriftClient.getServiceClient(MBThriftClient.java:225) > at > org.wso2.andes.thrift.MBThriftClient.updateMessageId(MBThriftClient.java:117) > at > org.wso2.andes.kernel.slot.SlotCoordinatorCluster.updateMessageId(SlotCoordinatorCluster.java:53) > at > org.wso2.andes.kernel.slot.SlotMessageCounter.submitSlot(SlotMessageCounter.java:203) > at > org.wso2.andes.kernel.slot.SlotMessageCounter$1.run(SlotMessageCounter.java:103) > at java.util.TimerThread.mainLoop(Timer.java:555) > at java.util.TimerThread.run(Timer.java:505) > Caused by: java.net.ConnectException: Connection refused > at java.net.PlainSocketImpl.socketConnect(Native Method) > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:579) > at org.apache.thrift.transport.TSocket.open(TSocket.java:182) > ... 7 more > [2015-07-14 17:59:23,212] INFO {org.wso2.andes.thrift.MBThriftClient} - > Reconnecting to Slot Coordinator 10.100.1.146:7612 > [2015-07-14 17:59:23,218] INFO {org.wso2.andes.thrift.MBThriftClient} - > Reconnecting to Slot Coordinator 10.100.1.146:7612 > [2015-07-14 17:59:23,237] INFO {org.wso2.andes.kernel.AndesChannel} - > Flow control disabled for channel 2. > > > 2. Failover can be fast. When it is connected to new node, slot > coordinator might not be elected. > > Yes , that means the coordinator, the nodes will try to connect to is an already killed node. Similar to question 1 scenario. Thing to note is slot coordinator uses the Hazelcast maps. Infact every node has a thrift server and the a client. So any node can get a slot from any node. Including itself. With the help of HZ we have coordinated it to make the request from a single unique node (coordinator node). > 3. What happens to assigned slots to the killed node? > It should become an orphaned slot. We had issues regarding this though. But Ideally this should become an orphaned slot > > 4. What if an overlapping slot comes at the instance of node being killed? > There is a risk of message duplication? Because overlapping slots must be > assigned to the same node as there might be already delivered messages. > > When an overlapping slot comes to the killed node, before it get killed, there is a chance that the killed coordinator didn't had an opportunity to create a slot. So the next coordinator runs a recovery logic with a timeout to create slots for all the nodes that didn't get any messages published after that node got killed. This can recover that slot. (Since we only keep the last assigned message id, recovery logic will create a slot with a range from last assigned message id and the current time) This is not the main intention of that recovery logic, but this would handle this scenario as well IMO. @Asanka Please comment if I'm wrong here. > 5. subscriber maps should be updated with failed over subscriber (remove > from previous node and added to the new node) > > I think you are a better judge on this than me :) Do we get duplicates because of this? I didn't observe that? Need to go throught the SubscriptionStore code to think about edge case. Currently nothing comes to my mind on that. > 6. messages addressed to topics (duplicated for node) should be purged. > > Agreed. We should delete those messages. We don't do that at moment > 7. Flow control + node kill - there are some edge cases surrounding that. > > Flow controlling is a local feature (Doesn't know about clustered env.). We had an issue where, if one node send a flow control message after failover client stays flow controlled until the newly connected node sends disable flow control message. But since this is local new node never sends it. Current fix for this was to make the client itself disable the flow control state in a failover scenario. (Fix for the client) > 8. Messages inside disruptor. If we do not use publisher transactions some > messages might be lost (by design) > > Yes, Messages that were not written to DB, (No need to go to the StateEventHandler) should be sent. If messages were written but updateSlot was not called thru StateEventHandler still those messages should be delivered now, since we have a slot recovery logic in place. New/existing coordinator will trigger a slot recovery. This will pick up those messages for delivery. > > Basically what we need to do is, whatever message we get into MB and > written to DB, we should deliver them. Thus the distributed slot logic > should tolerate node kill. > > Are above things addressed now? Can you describe in high level what > happens in above cases? > > Thanks > > Please correct me if my understanding is wrong on those. Regards, Asitha > > -- > *Hasitha Abeykoon* > Senior Software Engineer; WSO2, Inc.; http://wso2.com > *cell:* *+94 719363063* > *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com> > > -- *Asitha Nanayakkara* Software Engineer WSO2, Inc. http://wso2.com/ Mob: + 94 77 85 30 682
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
