Hi Hasitha

sorry for the delayed response. I've replied to the questions in line with
my understanding of things.
@Sajini, HasithaD correct me if I'm wrong.

On Tue, Jul 14, 2015 at 11:23 PM, Hasitha Hiranya <[email protected]> wrote:

> Hi Asitha,
>
> I want to know in high level what happens to slots when a node is killed.
>
> 1. A new slot coordinator should be elected. Until then other nodes might
> connect to that node.
>
> Trying to connect to the killed node will give an exception until the the
proper connection is updated. So there will be no slots assigned and other
than the exception there won't be any message distribution issue


>
> 2015-07-14 17:59:18,207] ERROR {org.wso2.andes.thrift.MBThriftClient} -
>  Could not connect to the Thrift Server 
> 10.100.1.146:7611java.net.ConnectException:
> Connection refused
> org.apache.thrift.transport.TTransportException:
> java.net.ConnectException: Connection refused
> at org.apache.thrift.transport.TSocket.open(TSocket.java:187)
> at
> org.wso2.andes.thrift.MBThriftClient.reConnectToServer(MBThriftClient.java:281)
> at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:78)
> at
> org.wso2.andes.kernel.slot.SlotCoordinatorCluster.getSlot(SlotCoordinatorCluster.java:44)
> at
> org.wso2.andes.kernel.slot.SlotDeliveryWorker.requestSlot(SlotDeliveryWorker.java:301)
> at
> org.wso2.andes.kernel.slot.SlotDeliveryWorker.run(SlotDeliveryWorker.java:109)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:579)
> at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
> ... 8 more
> [2015-07-14 17:59:18,210] ERROR
> {org.wso2.andes.kernel.slot.SlotDeliveryWorker} -  Error occurred while
> connecting to the thrift coordinator Coordinator has changed
> org.wso2.andes.kernel.slot.ConnectionException: Coordinator has changed
> at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:83)
> at
> org.wso2.andes.kernel.slot.SlotCoordinatorCluster.getSlot(SlotCoordinatorCluster.java:44)
> at
> org.wso2.andes.kernel.slot.SlotDeliveryWorker.requestSlot(SlotDeliveryWorker.java:301)
> at
> org.wso2.andes.kernel.slot.SlotDeliveryWorker.run(SlotDeliveryWorker.java:109)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.thrift.transport.TTransportException:
> java.net.SocketException: Connection reset
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> at
> org.wso2.andes.thrift.slot.gen.SlotManagementService$Client.recv_getSlotInfo(SlotManagementService.java:101)
> at
> org.wso2.andes.thrift.slot.gen.SlotManagementService$Client.getSlotInfo(SlotManagementService.java:87)
> at org.wso2.andes.thrift.MBThriftClient.getSlot(MBThriftClient.java:73)
> ... 6 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
> ... 14 more
> [2015-07-14 17:59:18,211] ERROR {org.wso2.andes.thrift.MBThriftClient} -
>  Could not initialize the Thrift client. java.net.ConnectException:
> Connection refused
> org.apache.thrift.transport.TTransportException:
> java.net.ConnectException: Connection refused
> at org.apache.thrift.transport.TSocket.open(TSocket.java:187)
> at
> org.wso2.andes.thrift.MBThriftClient.getServiceClient(MBThriftClient.java:225)
> at
> org.wso2.andes.thrift.MBThriftClient.updateMessageId(MBThriftClient.java:117)
> at
> org.wso2.andes.kernel.slot.SlotCoordinatorCluster.updateMessageId(SlotCoordinatorCluster.java:53)
> at
> org.wso2.andes.kernel.slot.SlotMessageCounter.submitSlot(SlotMessageCounter.java:203)
> at
> org.wso2.andes.kernel.slot.SlotMessageCounter$1.run(SlotMessageCounter.java:103)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)
> Caused by: java.net.ConnectException: Connection refused
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:579)
> at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
> ... 7 more
> [2015-07-14 17:59:23,212]  INFO {org.wso2.andes.thrift.MBThriftClient} -
>  Reconnecting to Slot Coordinator 10.100.1.146:7612
> [2015-07-14 17:59:23,218]  INFO {org.wso2.andes.thrift.MBThriftClient} -
>  Reconnecting to Slot Coordinator 10.100.1.146:7612
> [2015-07-14 17:59:23,237]  INFO {org.wso2.andes.kernel.AndesChannel} -
>  Flow control disabled for channel 2.
>
>
> 2. Failover can be fast. When it is connected to new node, slot
> coordinator might not be elected.
>
> Yes , that means the coordinator, the nodes will try to connect to is an
already killed node. Similar to question 1 scenario. Thing to note is slot
coordinator uses the Hazelcast maps. Infact every node has a thrift server
and the a client. So any node can get a slot from any node. Including
itself. With the help of HZ we have coordinated it to make the request from
a single unique node (coordinator node).


> 3. What happens to assigned slots to the killed node?
>

It should become an orphaned slot. We had issues regarding this though. But
Ideally this should become an orphaned slot

>
> 4. What if an overlapping slot comes at the instance of node being killed?
> There is a risk of message duplication? Because overlapping slots must be
> assigned to the same node as there might be already delivered messages.
>
> When an overlapping slot comes to the killed node, before it get killed,
there is a chance that the killed coordinator didn't had an opportunity to
create a slot. So the next coordinator runs a recovery logic with a timeout
to create slots for all the nodes that didn't get any messages published
after that node got killed. This can recover that slot. (Since we only keep
the last assigned message id, recovery logic will create a slot with a
range from last assigned message id and the current time) This is not the
main intention of that recovery logic, but this would handle this scenario
as well IMO.
@Asanka Please comment if I'm wrong here.



> 5.  subscriber maps should be updated with failed over subscriber (remove
> from previous node and added to the new node)
>
> I think you are a better judge on this than me :) Do we get duplicates
because of this? I didn't observe that? Need to go throught the
SubscriptionStore code to think about edge case. Currently nothing comes to
my mind on that.


> 6. messages addressed to topics (duplicated for node) should be purged.
>
> Agreed. We should delete those messages. We don't do that at moment


> 7. Flow control + node kill  - there are some edge cases surrounding that.
>
> Flow controlling is a local feature (Doesn't know about clustered env.).
We had an issue where, if one node send a flow control message after
failover client stays flow controlled until the newly connected node sends
disable flow control message. But since this is local new node never sends
it. Current fix for this was to make the client itself disable the flow
control state in a failover scenario. (Fix for the client)


> 8. Messages inside disruptor. If we do not use publisher transactions some
> messages might be lost (by design)
>
> Yes, Messages that were not written to DB, (No need to go to the
StateEventHandler) should be sent. If messages were written but updateSlot
was not called thru StateEventHandler still those messages should be
delivered now, since we have a slot recovery logic in place. New/existing
coordinator will trigger a slot recovery. This will pick up those messages
for delivery.

>
> Basically what we need to do is, whatever message we get into MB and
> written to DB, we should deliver them. Thus the distributed slot logic
> should tolerate node kill.
>
> Are above things addressed now? Can you describe in high level what
> happens in above cases?
>
> Thanks
>
> Please correct me if my understanding is wrong on those.

Regards,
Asitha


>
> --
> *Hasitha Abeykoon*
> Senior Software Engineer; WSO2, Inc.; http://wso2.com
> *cell:* *+94 719363063*
> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>
> --
*Asitha Nanayakkara*
Software Engineer
WSO2, Inc. http://wso2.com/
Mob: + 94 77 85 30 682
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to