+1
This is a dev list question, but what is the best way to package
Zookeeper in an Airavata release?
Marlon
On 6/25/14, 2:46 PM, Amila Jayasekara wrote:
Great Work !
I would love to see a demo of this.
Thanks
Amila
On Wed, Jun 25, 2014 at 2:09 PM, Lahiru Gunathilake <[email protected]>
wrote:
Hi All,
I have finished the initial version of the ZK integration. Now we can start
multiple thrift gfac services (still the communication between orchestrator
and gfac is RPC) and orchestrator submit jobs to multiple gfac nodes.
I can kill a gfac node and orchestrator will make sure jobs are not lost,
it simply take those jobs and re-submit to gfac. Since GFac is a generic
framework and we have multiple plugins developed for that framework
checkpointing the plugin is up to the plugin developers but gfac
checkpoints whether those plugins invoked or not.
I have introduced a new interface for plugin development called Recoverable
(RecoverableHandlers and RecoverableProvider). So state-full plugins has to
implement their recover method and gfac framework will make sure it will be
invoked during a re-run scenario. If a plugin is not recoverable and
already ran(can be found using framework checkpointing) during the re-run
that plugin will not be invoked. For now I just implemented recoverability
to few plugins and I have tested submitting a job to trestles and let it
submit and come to monitoring state and kill that gfac instance. Now
Orchestrator pick that execution and re-submit to another gfac node and
that gfac node does not re-run that job to the computing resource, but
simply start monitoring once the job is done outputs are downloaded from
the original output location.
When a particular experiment is finished all the ZK data is removed.
At this point following things needs to be done,
1. Figure out all the state-full handlers/providers and implement
recoverability,
Ex: Input handler is transfering 1000 files and with 500 files gfac
instance crashed, during the re-run it should be able to tranfer from 501
file. Same logic can be applied to a single huge file. Those things are
completely up to the plugin developer.
2. Then we have to do remove the RPC invocation and make gfac nodes as
worker nodes.
Regards
Lahiru
On Wed, Jun 18, 2014 at 12:11 PM, Lahiru Gunathilake <[email protected]>
wrote:
Hi Eran,
On Tue, Jun 17, 2014 at 4:06 PM, Eran Chinthaka Withana <
[email protected]> wrote:
Storm has a Kafka spout which manages the cursor location (pointer to
the
head of the queue representing the next message to be processed) inside
ZK.
Each storm spout instance uses this information to get the next item to
process. Storm kafka spout won't advance to the next message until it
gets
an ack from the storm topology.
If we have 10 jobs in the queue and 5 GFAC instances picked 1 at a time
and successfully submitted and have to start taking rest of the jobs. But
all 5 GFAC instances are responsible for initially picked 5 jobs because
they are still running and gfac instances are monitoring them until its
done but at the same time we have to move the cursor to pick other jobs
too.
If we Ack and moved the cursor just after submission without waiting
until
the job is actually finished how are we going to know which gfac is
monitoring which set of jobs ?
I am not getting how achieve above requirement with this suggestion. May
be I am missing something here.
Regards
Lahiru
So, if there is an exception in the topology and ack is sent only by the
last bolt, then storm bolt make sure all messages are processed since
exceptions won't generate acks.
Thanks,
Eran Chinthaka Withana
On Tue, Jun 17, 2014 at 12:30 PM, Lahiru Gunathilake <[email protected]
wrote:
Hi Eran,
I think I should take back my last email. When I carefully look at
storm I
have following question.
How are we going to store the Job statuses and relaunch the jobs
which
was
running in failure nodes ? Its true that storm is starting new workers
but
there should be a way to find missing jobs by someone in the system.
Since
we are not having a data stream there is no use to start new workers
unless
we handler the missing jobs. I think we need to have a better control
of
our component and persist the states of jobs each GFAC node is
handling.
Directly using zookeeper will let us to do a proper fault tolerance
implementation.
Regards
Lahiru
On Tue, Jun 17, 2014 at 3:14 PM, Lahiru Gunathilake <
[email protected]>
wrote:
Hi Supun,
I think in this usecase we only use storm topology to do the
communication
among workers and we are completely ignoring the stream processing
part.
Orchestrator will talk to Nimbus and GFAC nodes will be Worker nodes
in
the
storm topology. But I think we can achieve extremely fault tolerance
system
by directly using storm based on following statement in storm site
with
minimum changes in airavata.
Additionally, the Nimbus daemon and Supervisor daemons are fail-fast
and
stateless; all state is kept in Zookeeper or on local disk. This
means
you
can kill -9 Nimbus or the Supervisors and they’ll start back up like
nothing happened. This design leads to Storm clusters being
incredibly
stable.
On Tue, Jun 17, 2014 at 3:02 PM, Supun Kamburugamuva <
[email protected]>
wrote:
Hi Eran,
I'm using Storm every day and this is one of the strangest things
I've
heard about using Storm. My be there are more use cases for Storm
other
than Distributed Stream processing. AFAIK the Bolts, spouts are
built to
handle a stream of events that doesn't take much time to process.
In
Airavata we don't process the messages. Instead we run experiments
based
on
the commands given.
If you want process isolation, distributed execution, cluster
resource
management Yarn would be a better thing to explore.
Thanks,
Supun..
On Tue, Jun 17, 2014 at 2:27 PM, Eran Chinthaka Withana <
[email protected]> wrote:
Hi Lahiru,
good summarization. Thanks Lahiru.
I think you are trying to stick to a model where Orchestrator
distributing
to work for GFac worker and trying to do the impedance mismatch
through
a
messaging solution. If you step back and think, we don't even
want
the
orchestrator to handle everything. From its point of view, it
should
submit
jobs to the framework, and will wait or get notified once the job
is
done.
There are multiple ways of doing this. And here is one method.
Orchestrator submits all its jobs to Job queue (implemented using
any
MQ
impl like Rabbit or Kafka). A storm topology is implemented to
dequeue
messages, process them (i.e. submit those jobs and get those
executed)
and
notify the Orchestrator with the status (either through another
JobCompletionQueue or direct invocation).
With this approach, the MQ provider will help to match impedance
between
job submission and consumption. Storm helps with worker
coordination,
load
balancing, throttling on your job execution framework, worker
pool
management and fault tolerance.
Of course, you can implement this based only on ZK and handle
everything
else on your own but storm had done exactly that with the use of
ZK
underneath.
Finally, if you go for a model like this, then even beyond job
submission,
you can use the same model to do anything within the framework
for
internal
communication. For example, the workflow engine will submit its
jobs
to
queues based on what it has to do. Storm topologies exists for
each
queues
to dequeue messages and carry out the work in a reliable manner.
Consider
this as mini-workflows within a larger workflow framework.
We can have a voice chat if its more convenient. But not at 7am
PST :)
Thanks,
Eran Chinthaka Withana
On Tue, Jun 17, 2014 at 10:12 AM, Lahiru Gunathilake <
[email protected]
wrote:
Hi All,
Ignoring the tool that we are going to use to implement fault
tolerance I
have summarized the model we have decided so far. I will use
the
tool
name
as X, we can use Zookeeper or some other implementation.
Following
design
assume tool X and Registry have high availability.
1. Orchestrator and GFAC worker node communication is going to
be
queue
based and tool X is going to be used for this communication.
(We
have
to
implement this with considering race condition between
different
gfac
workers).
2. We are having multiple instances of GFAC which are identical
(In
future
we can group gfac workers). Existence of each worker node is
identified
using X. If node goes down orchestrator will be notified by X.
3. When a particular request comes and accepted by one gfac
worker
that
information will be replicated in tool X and a place where this
information
is persisted even the worker failed.
4. When a job comes to a final state like failed or cancelled
or
completed
above information will be removed. So at a given time
orchestrator
can
poll
active jobs in each worker by giving a worker ID.
5. Tool X will make sure that when a worker goes down it will
notify
orchestrator. During a worker failure, based on step 3 and 4
orchestrator
can poll all the active jobs of that worker and do the same
thing
like in
step 1 (store the experiment ID to the queue) and gfac worker
will
pick
the
jobs.
6. When GFAC receive a job like in step 5 it have to carefully
evaluate
the
state from registry and decide what to be done (If the job is
pending
then
gfac just have to monitor, if job state is like input
transferred
not
even
submitted gfac has to execute rest of the chain and submit the
job
to
the
resource and start monitoring).
If we can find a tool X which supports all these features and
tool
itself
is fault tolerance and support atomicity, high availability and
simply
API
to implement we can use that tool.
WDYT ?
Lahiru
On Mon, Jun 16, 2014 at 2:38 PM, Supun Kamburugamuva <
[email protected]>
wrote:
Hi Lahiru,
Before moving with an implementation it may be worth to
consider
some
of
the following aspects as well.
1. How to report the progress of an experiment as state in
ZooKeeper?
What
happens if a GFac instance crashes while executing an
experiment?
Are
there
check-points we can save so that another GFac instance can
take
over?
2. What is the threading model of GFac instances? (I consider
this
as a
very important aspect)
3. What are the information needed to be stored in the
ZooKeeper?
You
may
need to store other information about an experiment apart
from
its
experiment ID.
4. How to report errors?
5. For GFac weather you need a threading model or worker
process
model?
Thanks,
Supun..
On Mon, Jun 16, 2014 at 2:22 PM, Lahiru Gunathilake <
[email protected]
wrote:
Hi All,
I think the conclusion is like this,
1, We make the gfac as a worker not a thrift service and we
can
start
multiple workers either with bunch of providers and
handlers
configured
in
each worker or provider specific workers to handle the
class
path
issues
(not the common scenario).
2. Gfac workers can be configured to watch for a given path
in
zookeeper,
and multiple workers can listen to the same path. Default
path
can be
/airavata/gfac or can configure paths like
/airavata/gfac/gsissh
/airavata/gfac/bes.
3. Orchestrator can configure with a logic to store
experiment
IDs in
zookeeper with a path, and orchestrator can be configured
to
provider
specific path logic too. So when a new request come
orchestrator
store
the
experimentID and these experiments IDs are stored in Zk as
a
queue.
4. Since gfac workers are watching they will be notified
and
as
supun
suggested can use a leader selection algorithm[1] and one
gfac
worker
will
take the leadership for each experiment. If there are gfac
instances
for
each provider same logic will apply among those nodes with
same
provider
type.
[1]
http://curator.apache.org/curator-recipes/leader-election.html
I would like to implement this if there are no objections.
Lahiru
On Mon, Jun 16, 2014 at 11:51 AM, Supun Kamburugamuva <
[email protected]
wrote:
Hi Marlon,
I think you are exactly correct.
Supun..
On Mon, Jun 16, 2014 at 11:48 AM, Marlon Pierce <
[email protected]>
wrote:
Let me restate this, and please tell me if I'm wrong.
Orchestrator decides (somehow) that a particular job
requires
JSDL/BES,
so
it places the Experiment ID in Zookeeper's
/airavata/gfac/jsdl-bes
node.
GFAC servers associated with this instance notice the
update.
The
first
GFAC to claim the job gets it, uses the Experiment ID
to
get
the
detailed
information it needs from the Registry. ZooKeeper
handles
the
locking,
etc
to make sure that only one GFAC at a time is trying to
handle
an
experiment.
Marlon
On 6/16/14, 11:42 AM, Lahiru Gunathilake wrote:
Hi Supun,
Thanks for the clarification.
Regards
Lahiru
On Mon, Jun 16, 2014 at 11:38 AM, Supun Kamburugamuva
<
[email protected]>
wrote:
Hi Lahiru,
My suggestion is that may be you don't need a Thrift
service
between
Orchestrator and the component executing the
experiment.
When a
new
experiment is submitted, orchestrator decides who can
execute
this
job.
Then it put the information about this experiment
execution
in
ZooKeeper.
The component which wants to executes the experiment
is
listening
to
this
ZooKeeper path and when it sees the experiment it
will
execute
it.
So
that
the communication happens through an state change in
ZooKeeper.
This
can
potentially simply your architecture.
Thanks,
Supun.
On Mon, Jun 16, 2014 at 11:14 AM, Lahiru Gunathilake
<
[email protected]>
wrote:
Hi Supun,
So your suggestion is to create a znode for each
thrift
service
we
have
and
when the request comes that node gets modified with
input
data
for
that
request and thrift service is having a watch for
that
node
and
it
will
be
notified because of the watch and it can read the
input
from
zookeeper
and
invoke the operation?
Lahiru
On Thu, Jun 12, 2014 at 11:50 PM, Supun
Kamburugamuva
<
[email protected]>
wrote:
Hi all,
Here is what I think about Airavata and ZooKeeper.
In
Airavata
there
are
many components and these components must be
stateless
to
achieve
scalability and reliability.Also there must be a
mechanism to
communicate
between the components. At the moment Airavata uses
RPC
calls
based
on
Thrift for the communication.
ZooKeeper can be used both as a place to hold state
and
as a
communication
layer between the components. I'm involved with a
project
that
has
many
distributed components like AIravata. Right now we
use
Thrift
services
to
communicate among the components. But we find it
difficult to
use
RPC
calls
and achieve stateless behaviour and thinking of
replacing
Thrift
services
with ZooKeeper based communication layer. So I
think
it
is
better
to
explore the possibility of removing the Thrift
services
between
the
components and use ZooKeeper as a communication
mechanism
between
the
services. If you do this you will have to move the
state
to
ZooKeeper
and
will automatically achieve the stateless behaviour
in
the
components.
Also I think trying to make ZooKeeper optional is a
bad
idea.
If
we
are
trying to integrate something fundamentally
important to
architecture
as
how to store state, we shouldn't make it optional.
Thanks,
Supun..
On Thu, Jun 12, 2014 at 10:57 PM, Shameera
Rathnayaka <
[email protected]> wrote:
Hi Lahiru,
As i understood, not only reliability , you are
trying
to
achieve
some
other requirement by introducing zookeeper, like
health
monitoring
of
the
services, categorization with service
implementation
etc
...
.
In
that
case, i think we can get use of zookeeper's
features
but
if
we
only
focus
on reliability, i have little bit of concern, why
can't
we
use
clustering +
LB ?
Yes it is better we add Zookeeper as a
prerequisite
if
user
need
to
use
it.
Thanks,
Shameera.
On Thu, Jun 12, 2014 at 5:19 AM, Lahiru
Gunathilake
<
[email protected]
wrote:
Hi Gagan,
I need to start another discussion about it, but
I
had
an
offline
discussion with Suresh about auto-scaling. I will
start
another
thread
about this topic too.
Regards
Lahiru
On Wed, Jun 11, 2014 at 4:10 PM, Gagan Juneja <
[email protected]
wrote:
Thanks Lahiru for pointing to nice library,
added
to
my
dictionary
:).
I would like to know how are we planning to start
multiple
servers.
1. Spawning new servers based on load? Some
times
we
call
it
as
auto
scalable.
2. To make some specific number of nodes
available
such as
we
want 2
servers to be available at any time so if one
goes
down
then I
need
to
spawn one new to make available servers count 2.
3. Initially start all the servers.
In scenario 1 and 2 zookeeper does make sense
but
I
don't
believe
existing
architecture support this?
Regards,
Gagan
On 12-Jun-2014 1:19 am, "Lahiru Gunathilake" <
[email protected]
wrote:
Hi Gagan,
Thanks for your response. Please see my inline
comments.
On Wed, Jun 11, 2014 at 3:37 PM, Gagan Juneja <
[email protected]>
wrote:
Hi Lahiru,
Just my 2 cents.
I am big fan of zookeeper but also against
adding
multiple
hops
in
the
system which can add unnecessary complexity.
Here
I
am
not
able
to
understand the requirement of zookeeper may be
I am
wrong
because
of
less
knowledge of the airavata system in whole. So I
would
like
to
discuss
following point.
1. How it will help us in making system more
reliable.
Zookeeper
is
not
able to restart services. At max it can tell
whether
service
is
up
or not
which could only be the case if airavata service
goes
down
gracefully and
we have any automated way to restart it. If this
is
just
matter
of
routing
client requests to the available thrift servers
then
this
can
be
achieved
with the help of load balancer which I guess is
already
there
in
thrift
wish list.
We have multiple thrift services and
currently
we
start
only
one
instance
of them and each thrift service is a stateless
service. To
keep
the
high
availability we have to start multiple instances
of
them
in
production
scenario. So for clients to get an available
thrift
service
we
can
use
zookeeper znodes to represent each available
service.
There
are
some
libraries which is doing similar[1] and I think we
can
use
them
directly.
2. As far as registering of different providers
is
concerned
do
you
think for that we really need external store.
Yes I think so, because its light weight and
reliable
and
we
have
to
do
very minimal amount of work to achieve all these
features
to
Airavata
because zookeeper handle all the complexity.
I have seen people using zookeeper more for
state
management
in
distributed environments.
+1, we might not be the most effective users
of
zookeeper
because
all
of
our services are stateless services, but my
point
is
to
achieve
fault-tolerance we can use zookeeper and with
minimal
work.
I would like to understand more how can we
leverage
zookeeper
in
airavata to make system reliable.
[1]
https://github.com/eirslett/thrift-zookeeper
Regards,
Gagan
On 12-Jun-2014 12:33 am, "Marlon Pierce" <
[email protected]
wrote:
Thanks for the summary, Lahiru. I'm cc'ing
the
Architecture
list
for
additional comments.
Marlon
On 6/11/14 2:27 PM, Lahiru Gunathilake wrote:
Hi All,
I did little research about Apache
Zookeeper[1]
and
how
to
use
it
in
airavata. Its really a nice way to achieve
fault
tolerance
and
reliable
communication between our thrift services
and
clients.
Zookeeper
is a
distributed, fault tolerant system to do a
reliable
communication
between
distributed applications. This is like an
in-memory
file
system
which
has
nodes in a tree structure and each node can
have
small
amount
of
data
associated with it and these nodes are called
znodes.
Clients
can
connect
to a zookeeper server and add/delete and
update
these
znodes.
In Apache Airavata we start multiple
thrift
services
and
these
can
go
down for maintenance or these can crash, if
we
use
zookeeper
to
store
these
configuration(thrift service configurations)
we
can
achieve
a
very
reliable
system. Basically thrift clients can
dynamically
discover
available
service
by using ephemeral znodes(Here we do not
have
to
change
the
generated
thrift client code but we have to change the
locations we
are
invoking
them). ephemeral znodes will be removed when
the
thrift
service
goes
down
and zookeeper guarantee the atomicity
between
these
operations.
With
this
approach we can have a node hierarchy for
multiple
of
airavata,
orchestrator,appcatalog and gfac thrift
services.
For specifically for gfac we can have
different
types
of
services
for
each
provider implementation. This can be
achieved
by
using
the
hierarchical
support in zookeeper and providing some
logic
in
gfac-thrift
service
to
register it to a defined path. Using the
same
logic
orchestrator
can
discover the provider specific gfac thrift
service
and
route
the
message to
the correct thrift service.
With this approach I think we simply have
write
some
client
code
in
thrift
services and clients and zookeeper server
installation
can
be
done as
a
separate process and it will be easier to
keep
the
Zookeeper
server
separate from Airavata because installation of
Zookeeper
server
little
complex in production scenario. I think we have
to
make
sure
everything
works fine when there is no Zookeeper
running,
ex:
enable.zookeeper=false
should works fine and users doesn't have to
download
and
start
zookeeper.
[1]http://zookeeper.apache.org/
Thanks
Lahiru
--
System Analyst Programmer
PTI Lab
Indiana University
--
System Analyst Programmer
PTI Lab
Indiana University
--
Best Regards,
Shameera Rathnayaka.
email: shameera AT apache.org , shameerainfo AT
gmail.com
Blog : http://shameerarathnayaka.blogspot.com/
--
Supun Kamburugamuva
Member, Apache Software Foundation;
http://www.apache.org
E-mail: [email protected]; Mobile: +1 812 369
6762
Blog: http://supunk.blogspot.com
--
System Analyst Programmer
PTI Lab
Indiana University
--
Supun Kamburugamuva
Member, Apache Software Foundation;
http://www.apache.org
E-mail: [email protected]; Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com
--
Supun Kamburugamuva
Member, Apache Software Foundation;
http://www.apache.org
E-mail: [email protected]; Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com
--
System Analyst Programmer
PTI Lab
Indiana University
--
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: [email protected]; Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com
--
System Analyst Programmer
PTI Lab
Indiana University
--
Supun Kamburugamuva
Member, Apache Software Foundation; http://www.apache.org
E-mail: [email protected]; Mobile: +1 812 369 6762
Blog: http://supunk.blogspot.com
--
System Analyst Programmer
PTI Lab
Indiana University
--
System Analyst Programmer
PTI Lab
Indiana University
--
System Analyst Programmer
PTI Lab
Indiana University
--
System Analyst Programmer
PTI Lab
Indiana University