Darren,

I originally presented my thoughts on this subject at CCC13 [1].  
Fundamentally, I see CloudStack as having two distinct tiers — orchestration 
management and automation control.  The orchestration tier coordinates the 
automation control layer to fulfill user goals (e.g. create a VM instance, 
alter a network route, snapshot a volume, etc) constrained by policies defined 
by the operator (e.g. multi-tenacy boundaries, ACLs, quotas, etc).  This layer 
must always be available to take new requests, and to report the best available 
infrastructure state information.  Since execution of work is guaranteed on 
completion of a request, this layer may pend work to be completed when the 
appropriate devices become available.

The automation control tier translates logical units of work to underlying 
infrastructure component APIs.  Upon completion of unit of work’s execution, 
the state of a device (e.g. hypervisor, storage device, network switch, router, 
etc) matches the state managed by the orchestration tier at the time unit of 
work was created.  In order to ensure that the state of the underlying devices 
remains consistent, these units of work must be executed serially.  Permitting 
concurrent changes to resources creates race conditions that lead to resource 
overcommitment and state divergence.   A symptom of this phenomenon are the 
myriad of scripts operators write to “synchronize” state between the CloudStack 
database and their hypervisors.  Another is the example provided below is the 
rapid create-destroy which can (and often does) leave dangling resources due to 
race conditions between the two operations.  

In order to provide reliability, CloudStack vertically partitions the 
infrastructure into zones (independent power source/network uplink combination) 
sub-divided into pods (racks).  At this time, regions are largely notional, as 
such, as are not partitions at this time.  Between the user’s zone selection 
and our allocators distribution of resources across pods, the system attempts 
to distribute resources widely as possible across these partitions to provide 
resilience against a variety infrastructure failures (e.g. power loss, network 
uplink disruption, switch failures, etc).  In order maximize this resilience, 
the control plane (orchestration + automation tiers) must be to operate on all 
available partitions.  For example, if we have two (2) zones (A & B) and twenty 
(20) pods per zone, we should be able to take and execute work in Zone A when 
one or more pods is lost, as well as, when taking and executing work in Zone B 
when Zone B has failed.

CloudStack is an eventually consistent system in that the state reflected in 
the orchestration tier will (optimistically) differ from the state of the 
underlying infrastructure (managed by the automation tier).  Furthermore, the 
system has a partitioning model to provide resilience in the face of a variety 
of logical and physical failures.  However, the automation control tier 
requires strictly consistent operations.  Based on these definitions, the 
system appears to violate the CAP theorem [2] (Brewer!).  The separation of the 
system into two distinct tiers isolates these characteristics, but the boundary 
between them must be carefully implemented to ensure that the consistency 
requirements of the automation tier are not leaked to the orchestration tier.

To properly implement this boundary, I think we should split the orchestration 
and automation control tiers into separate physical processes communicating via 
an RPC mechanism — allowing the automation control tier to completely 
encapsulate its work distribution model.  In my mind, the tricky wicket is 
providing serialization and partition tolerance in the automation control tier. 
 Realistically, there two options — explicit and implicit locking models.  
Explicit locking models employ an external coordination mechanism to coordinate 
exclusive access to resources (e.g. RDBMS lock pattern, ZooKeeper, Hazelcast, 
etc).  The challenge with this model is ensuring the availability of the 
locking mechanism in the face of partition — forcing CloudStack operators to 
ensure that they have deployed the underlying mechanism in a partition tolerant 
manner (e.g. don’t locate all of the replicas in the same pod, deploy a cluster 
per zone, etc).  Additionally, the durability introduced by these mechanisms 
inhibits the self-healing due to lock staleness.

In contrast, an implicit lock model structures the runtime execution model to 
provide exclusive access to a resource and model the partitioning scheme.  One 
such model is to provide a single work queue (mailbox) and consuming process 
(actor) per resource.  The orchestration tier provides a description of the 
partition and resource definitions to the automation control tier.  The 
automation control tier creates a supervisor per partition which in turn manage 
process creation per resource.  Therefore, process creation and destruction 
creates an implicit lock.  Since automation control tier does not persist data 
in this model,  The crash of a supervisor and/or process (supervisors are 
simply specialized processes) releases the implicit lock, and signals a 
re-execution of the supervisor/process allocation process.  The following 
high-level process describes creation allocation (hand waves certain details 
such as back pressure and throttling):

The automation control layer receives a resource definition (e.g. zone 
description, VM definition, volume information, etc).  These requests are 
processed by the owning partition supervisor exclusively in order of receipt.  
Therefore, the automation control tier views the world as a tree of partitions 
and resources.
The partition supervisor creates the process (and the associated mailbox) — 
providing it with the initial state.  The process state is Initialized.
The process synchronizes the state of the underlying resource with the state 
provided.  Upon successful completion of state synchronization, the state of 
the process becomes Ready.  Only Ready processes can consume units of work from 
their mailboxes.  The processes crashes.  All state transitions and crashes are 
reported to interested parties through an asynchronous event reporting 
mechanism including the id of the unit of work the device represents.

The Ready state means that the underlying device is in a useable state 
consistent with the last unit of work executed.  A process crashes when it is 
unable to bring the device into a state consistent with the unit of work being 
executed (a process crash also destroys the associated mailbox — flushing 
pending work).  This event initiates execution of allocation process (above) 
until the process can be re-allocated in a Ready state (again throttling is 
hand waved for the purposes of brevity).  The state synchronization step 
converges the actual state of the device with changes that occurred during 
unavailability.  When a unit of work fails to be executed, the orchestration 
tier determines the appropriate recovery strategy (e.g. re-allocate work to 
another resource, wait for the availability of the resource, fail the 
operation, etc).

The association of one process per resource provides exclusive access to the 
resource without the requirement of an external locking mechanism.  A mailbox 
per process provides orders pending units of work.  Together, they provide 
serialization of operation execution.  In the example provided, a unit of work 
would be submitted to create a VM and a second unit of work would be submitted 
to destroy it.  The creation would be completely executed followed by the 
destruction (assuming no failures).  Therefore, the VM will briefly exist 
before being destroyed.  In conduction with a process location mechanism, the 
system can place the processes associated with resources in the appropriate 
partition allowing the system properly self heal, manage its own scalability 
(thinking lightweight system VMs), and systematically enforce partition 
tolerance (the operator was nice enough to describe their infrastructure — we 
should use it to ensure resilience of CloudStack and their infrastructure).

Until relatively recently, the implicit locking model described was infeasible 
on the JVM.  Using native Java threads, a server would be limited to 
controlling (at best) a few hundred resources.  However, lightweight threading 
models implemented by libraries/frameworks such as Akka [3], Quasar [4], and 
Erjang [5] can scale to millions of “threads” on reasonability sized servers 
and provide the supervisor/actor/mailbox abstractions described above.  Most 
importantly, this approach does not require operators to become operationally 
knowledgeable of yet another platform/component.  In short, I believe we can 
encapsulate these requirements in the management server (orchestration + 
automation control tiers) — keeping the operational footprint of the system 
proportional to the deployment without sacrificing resilience.  Finally, it 
provides the foundation for proper collection of instrumentation information 
and process control/monitoring across data centers.

Admittedly, I have hand waved some significant issues that would beed to be 
resolved.  I believe they are all resolvable, but it would take discussion to 
determine the best approach to them.  Transforming CloudStack to such a model 
would not be trivial, but I believe it would be worth the (significant) effort 
as it would make CloudStack one of the most scalable and resilient cloud 
orchestration/management platforms available.

Thanks,
-John

[1]: 
http://www.slideshare.net/JohnBurwell1/how-to-run-from-a-zombie-cloud-stack-distributed-process-management
[2]: http://lpd.epfl.ch/sgilbert/pubs/BrewersConjecture-SigAct.pdf
[3]: http://akka.io
[4]: https://github.com/puniverse/quasar
[5]: https://github.com/trifork/erjang/wiki

P.S.  I have CC’ed the developer mailing list.  All conversations at this level 
of detail should be initiated and occur on the mailing list to ensure 
transparency with the community.

On Nov 22, 2013, at 3:49 PM, Darren Shepherd <darren.s.sheph...@gmail.com> 
wrote:

> 140 characters are not productive.  
> 
> What would be your idea way to do distributed concurrency control?  Simple 
> use case.  Server 1 receives a request to start a VM 1.  Server 2 receives a 
> request to delete VM 1.  What do you do?
> 
> Darren

Reply via email to