Hi Dimuthu,

 

Thanks for the clarification on the first issue.  The second issue is more 
about how you handle external errors in your approach: the SSH connection times 
out, for instance, and all the retries fail, so the Task Executor needs to 
report back that the command failed.

 

Marlon

 

 

From: "[email protected]" <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, November 2, 2017 at 7:33 AM
To: "[email protected]" <[email protected]>
Subject: Re: Linked Container Services for Apache Airavata Components - Phase 2 
- Initial Prototype

 

Hi Marlon, 

 

Thanks for the comments and please find the explanations for those comments 
below.

 

Transactionality 

 

Kafka has a rich acknowledgment mechanism. Actually there are several 
acknowledgement levels. Not like in RabbitMQ where messages are removed from 
the queue once read, Kafka keeps all the messages of the topic for a given 
time. Until that time expires, consumers are free to go back and forth on the 
topic to read previously read messages. However if we want to keep track of the 
read messages by a particular consumer (actually a consumer group) we can use 
kafka's Current Offset and Committed Offset to achieve this [1]. A consumer can 
read more than one messages from a partition of the topic. When it is done, 
Kafka keeps track of the pointer (offset) where that particular consumer 
finally read from the partition. That is the current offset. However it does 
not mean that messages are properly consumed by the consumer. Once the messages 
are consumed, consumer acknowledges that it has consumed x number of messages 
of the already read messages. Then Kafka keeps track of the last pointer where 
that particular consumer has acknowledged. This is the Committed Offset. Let's 
assume that the particular Consumer goes down and a new Consumer from the same 
consumer group starts to read from vacant partition. Then it will start to read 
from the Committed offset of last consumer. So to achieve the behavior that you 
have mentioned, it can be done by controlling the acknowledgement mechanism of 
a Consumer. There are mainly 2 acknowledgement mechanisms in Kafka

 

1. Auto commit - Once a consumer reads a message or set of messages at once, it 
will auto commit after a given time interval

2. Manual commit - Developer has the handle to decide whether to acknowledge or 
not for a particular message

 

>From above two methods we have to use manual commit as we have to make sure 
>that messages are acknowledged once it was properly handled. Manual commit 
>also has several levels and modes of acknowledgements.

 

1. Per Message acknowledgement

2. Per Message set acknowledgement

 

1. Asynchronous Mode

2. Synchronous Mode

 

Here we use per message synchronous acknowledgement as we want the highest 
level of transactionality. In that case, once we read a message from a Kafka 
topic, we parse it and do the necessary operations and finally acknowledges 
once the message has been successfully handled. If something went wrong 
(consumer failed) and read messages were not acknowledged, new consumer will 
continue from last acknowledged position. I have implemented PoC code that 
demonstrates above concept with a single producer and 3 consumers [2]. You can 
find the screen recording of the test from here [3].

 

If  you need 100% transactionality form consumer side, it is also possible with 
an external transactional scope like database operations. However in our case 
it is not required as the operations that we are doing inside the message 
handling operation are not reversible. Let's talk more about this approach in 
future.

 

Slow SSH communication

 

If you are expecting to improve the communication mechanism instead of SSH, one 
option is to use agents inside comupte resources and communicate with them in a 
optimized messaging protocol. I have provided an approach in the previous 
thread. But your concern is more about being the whole pipeline blocked due to 
a slow task not releasing the message from the topic, there is a solution for 
that. When we are publishing messages to a Kafka topic, they are distributed 
among the partitions of the topic. Consumers are reading from those partitions. 
If a consumer becomes slow, only the message read from that partitions will 
become slow. Other partitions will work without any issue. So we can have a 
higher number of partitions in a topic to handle that. Further if we want to go 
further like grouping slow tasks to a one set and allowing low latency tasks to 
an other set by using a custom partitioner [5]

 

Registry

 

It is already there [6]. Actually in this implementation, API server acts as 
the registry itself. It stores experiment objects and current status of the 
cluster. However As Gaurav has pointed out in the next email, that would be 
better if this is separated into multiple parts. What do you think?

 

[1] 
https://www.youtube.com/watch?v=kZT8v2_b2XE&index=15&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON

[2] https://github.com/DImuthuUpe/kafka-transactionality

[3] https://www.youtube.com/watch?v=j6bOVLUlyf4&feature=youtu.be

[4] 
https://www.youtube.com/watch?v=AshMNCxSp3c&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON&index=17

[5] 
https://www.youtube.com/watch?v=pMDAcNRkWkE&index=10&list=PLkz1SCf5iB4enAR00Z46JwY9GGkaS2NON

[6] 
https://github.com/apache/airavata-sandbox/tree/master/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server

 

On Thu, Nov 2, 2017 at 2:52 AM, Pierce, Marlon <[email protected]> wrote:

Hi Dimuthu,

 

Thanks for sending this very thoughtful document. A couple of comments:

 

* Use of Kafka instead of RabbitMQ is interesting. Can you say more about how 
this approach can handle Kafka client failures?  For RabbitMQ, for example, 
there is the simple “Work Queue” approach in which the broker pushes a task to 
a worker. The task remains in queue until the worker sends an acknowledgement 
that the job has been handled, not just received. “Handled” may mean for 
example that the job has been submitted to an external batch scheduler over 
SSH, which may require some retries, etc.   If the worker crashes before the 
job has been submitted, then the broker can resend the message to another 
worker.   I’m wondering how your Kafka-based solution would handle the same 
issue. 

 

* A simpler but more common failure is communicating with external resources. A 
task executor may need to SSH to a remote resource, which can fail (the 
resource is slow to communicate, usually). How do you handle this case?

 

* Your design focuses on Airavata’s experiment execution handling. Airavata’s 
registry is another important component: this is where experiment objects get 
persistently stored. The registry stores metadata about both “live” experiments 
that are currently executing as well as archived experiments that have 
completed.

 

How would you extend your architecture to include the registry?

 

Marlon

 

 

From: "[email protected]" <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, October 30, 2017 at 10:45 AM
To: "[email protected]" <[email protected]>
Subject: Linked Container Services for Apache Airavata Components - Phase 2 - 
Initial Prototype

 

Hi All, 

 

Based on the analysis of Phase 1, within past two weeks I have been working on 
implementing a task execution workflow following the microservices deployment 
pattern and Kubernetes as the deployment platform. 

 

Please find attached design document that explains the components and messaging 
interactions between components. Based on that design, I have implemented 
following components

 

1. Set of microservices to compose the workflow

2. A simple Web Console to  deploy and monitor workflows on the framework

 

I used Kakfa as the primary messaging medium to communicate among the 
microservices due to its simplicity and powerful features like partitions and 
consumer groups.

 

I have attached a user guide so that you can install and try this in your local 
machine. And source code for each component can be found from [1]

 

Please share you ideas and suggestions.

 

Thanks

Dimuthu

 

[1] 
https://github.com/DImuthuUpe/airavata/tree/master/sandbox/airavata-kubernetes

[2] 
https://docs.google.com/document/d/1R1xrmuPldHiWVDn4xNVay9Vnxn9FODQZXtF55JxJpSY/edit?usp=sharing

[3] 
https://docs.google.com/document/d/1A5eRIZiuUj4ShZVMS0NdAxjAxtOTZXculaYDCZ7IMQ8/edit?usp=sharing

 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to