On Mon, Oct 5, 2015 at 7:49 PM, Neil Conway <[email protected]> wrote:
> On Mon, Oct 5, 2015 at 3:20 PM, Maged Michael <[email protected]> wrote:
>> I have in mind three options.
>> (1) Text translation of Mesos source code. E.g., "process::Future"
>> into, say, "sim::process::Future".
>> - Pros: Does not require any changes to any Mesos or libprocess code.
>> Replace only what needs to be replaced in libprocess for simulation.
>> - Cons: Fragile.
>> (2) Integrate the simulation mode with the libprocess code.
>> - Pros: Robust. Add only what needs to be added to libprocess for
>> simulation. Partial reuse some data structures from regular-mode
>> libprocess.
>> - Cons: Might get in the way of the development and bug fixes in the
>> regular libprocess code.
>> (3) Changes to Mesos makefiles to use alternative simulation-oriented
>> libprocess code.
>> - Pros: Robust.
>> - Cons: Might need to create a lot of stubs that redirect to the
>> regular-mode (i.e., not for simulation) libprocess code that doesn't
>> need any change under simulation.
>
> My vote is for #2, with the caveat that we might have the code live in
> a separate Git repo/branch for a period of time until it has matured.
> If the simulator requires drastic (architectural) changes to
> libprocess, then merging the changes into mainline Mesos might be
> tricky -- but it might be easier to figure that out once we're closer
> to an MVP.
I see from your comments below on the dispatch sketch that there might
be a lot of code that can be reused from libprocess. So I agree with
you about preferring option #2.
>> As an example of what I have in mind. this a sketch of
>> sim::process::dispatch.
>>
>> template<class T, class... Args>
>> // Let R be an abbreviation of typename result_of<T::*method(Args...)>::type
>> sim::process::Future<R>
>> dispatch(
>> const sim::process::Process<T>& pid,
>> R (T::*method)(Args...),
>> Args... args)
>> {
>> /* Still running in the context of the parent simulated thread -
>> the same C++/OS thread as the simulator. */
>> <context switch to the simulator and back to allow event
>> interleaving> /* e.g., setjmp/longjmp */
>> // create a promise
>> std::shared_ptr<sim::process::Promise(R) prom(new
>> sim::process::Promise<R>());
>> <create a function object fn initialized with T::method and args>
>> <associate prom with fn> // e.g., a map structure
>> <enqueue fn in pid's structure>
>> return prom->future();
>> /* The dispatched function will start running when at some point
>> later the simulator decides to switch to the child thread (pid) when
>> pid is ready to run fn. */
>> }
>
> I wonder how much of what is happening here (e.g., during the
> setjmp/longjmp) could be implemented by instead modifying the
> libprocess event queuing/dispatching logic. For example, suppose Mesos
> is running on two CPUs (and let's ignore network I/O + clock for now).
> If you want to explore all possible schedules, you could start by
> capturing the non-deterministic choices that are made when the
> processing threads (a) send messages concurrently (b) choose new
> processes to run from the run queue. Does that sound like a feasible
> approach?
I agree. Probably other structures too such as the dispatch queues
have counterparts in libprocess that can be reused but without the
need for the thread-safety parts of the code.
> Other suggestions:
>
> * To make what you're suggesting concrete, it would be great if you
> started with a VERY minimal prototype -- say, a test program that
> creates three libprocess processes and has them exchange messages. The
> order in which messages will be sent/received is non-deterministic [1]
> -- can we build a simulator that (a) can explore all possible
> schedules (b) can replay the schedule chosen by a previous simulation
> run?
>
> * For a more interesting but still somewhat-tractable example, the
> replicated log (src/log) might be a good place to start. It is fairly
> decoupled from the rest of Mesos and involves a bunch of interesting
> concurrency. If you setup a test program that creates N log replicas
> (in a single OS process) and then explores the possible interleavings
> of the messages exchanged between them, that would be a pretty cool
> result! There's also a bunch of Paxos-specific invariants that you can
> check for (e.g., once the value of a position is agreed-to by a quorum
> of replicas, that value will eventually appear at that position in all
> sufficiently connected log replicas).
Good suggestions. Thanks.
On a related note. I have this simple program with a configurable bug
as a test for the simulator's handling of dispatches and futures.
#include <atomic>
#include <climits>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/process.hpp>
class AdderProcess : public process::Process<AdderProcess> {
public:
process::Future<Nothing> buggyAdd(
std::atomic<int>& var,
int addval,
bool bug)
{
while (true) {
int oldval = var.load();
if (bug) { var.store(oldval+addval); break; }
if (atomic_compare_exchange_weak(&var, &oldval, oldval+addval))
break;
}
return Nothing();
}
};
/*
* conc: concurrency level
* disp: number of dispatches to each spawned process per test
* reps: test repetitions
* reciprocal:
* if 0, then the bug is not triggered.
* otherwise, the probability of taking the buggy path is 1/reciprocal.
*/
void test(uint conc, uint disp, uint reps, uint reciprocal, uint seed)
{
std::uniform_int_distribution<uint> rand_bug(
(reciprocal > 0) ? 0 : 1, // min
(reciprocal > 0) ? reciprocal-1 : 1); // max
std::uniform_int_distribution<int> rand_int(INT_MIN,INT_MAX);
std::default_random_engine rgen(seed);
// For periodic output of reps done to show progress
uint out = 1; while (out * conc * disp < 1000000) out *= 10;
std::vector<std::unique_ptr<AdderProcess>> adders(conc);
std::vector<process::PID<AdderProcess>> pids(conc);
using FutVec = std::vector<process::Future<Nothing>>;
std::vector<FutVec> futs(conc, FutVec(disp));
uint errors = 0;
for (uint i = 0; i < reps; ++i) {
// Initial value
int val = rand_int(rgen);
std::atomic<int> result(val);
int expected = val;
for (uint j = 0; j < conc; ++j) {
adders[j] = std::unique_ptr<AdderProcess>(new AdderProcess());
pids[j] = process::spawn(*adders[j]);
}
for (uint k = 0; k < disp; ++k) {
for (uint j = 0; j < conc; ++j) {
int addval = rand_int(rgen);
bool bug = (rand_bug(rgen) == 0);
futs[j][k] =
process::dispatch(
pids[j],
&AdderProcess::buggyAdd,
std::ref(result),
addval,
bug);
expected += addval;
}
}
for (uint j = 0; j < conc; ++j) {
for (uint k = 0; k < disp; ++k) {
futs[j][k].await();
}
process::terminate(*adders[j]);
process::wait(*adders[j]);
}
if (expected != result.load()) {
std::cout << std::setw(2) << std::right << ++errors << " ERROR: "
<< "in rep " << std::setw(7) << i+1
<< " expected = " << std::setw(12) << expected
<< " result = " << std::setw(12) << result
<< std::endl;
}
// Shpow progress
if ((i+1) % out == 0) std::cout << i+1 << " reps" << std::endl;
}
if (reps < out) std::cout << reps << " reps" << std::endl;
}
int usage(int argc, char** argv)
{
std::cout << "Usage: "
<< argv[0] << "\n"
<< " <concurrency level>\n"
<< " <number of dispatches per process>\n"
<< " <number of test repetitions>\n"
<< " <reciprocal of bug probability>\n"
<< " [ <randomization seed> ]\n";
exit(EXIT_FAILURE);
}
int main(int argc, char** argv)
{
if (argc < 5) return usage(argc, argv);
int conc = atoi(argv[1]);
int disp = atoi(argv[2]);
int reps = atoi(argv[3]);
int reciprocal = atoi(argv[4]);
int seed = (argc > 5) ? atoi(argv[5]) : 1;
test(conc, disp, reps, reciprocal, seed);
return EXIT_SUCCESS;
}
These are some runs with concurrency level of 2 (on a real machine of course):
# ./main 2 10 100 1
1 ERROR: in rep 90 expected = -1511601456 result = 1046423663
100 reps
# ./main 2 100 100 10
1 ERROR: in rep 42 expected = -151886366 result = -622836464
2 ERROR: in rep 79 expected = -1368834911 result = 847310301
3 ERROR: in rep 95 expected = -855451124 result = 1437502026
100 reps
# ./main 2 1000 100 100
1 ERROR: in rep 17 expected = -307403467 result = -1195310625
2 ERROR: in rep 34 expected = 561296706 result = 1662897226
3 ERROR: in rep 39 expected = -1544469602 result = -284349610
4 ERROR: in rep 62 expected = 1260310182 result = -726382340
5 ERROR: in rep 73 expected = 899803372 result = 1565456308
6 ERROR: in rep 84 expected = 1742680344 result = 1096160133
100 reps
# ./main 2 10000 100 1000
1 ERROR: in rep 13 expected = 2097701788 result = -362933914
2 ERROR: in rep 17 expected = -1201729575 result = 475765791
3 ERROR: in rep 26 expected = -86421113 result = 681302623
100 reps
It appears that an order of 1000*(1/p) dispatches of the operation per
process are needed for the bug to cause an incorrect result on this
machine (p is the probability of taking the buggy path). The would-be
simulator should do a better job catching the error and generating a
bug trace.
> Neil
>
> [1] Although note that not all message schedules are possible: for
> example, message schedules can't violate causal dependencies. i.e., if
> process P1 sends M1 and then M2 to P2, P2 can't see <M2,M1> (it might
> see only <>, <M1>, or <M2> if P2 is remote). Actually, that suggests
> to me we probably want to distinguish between local and remote message
> sends in the simulator: the former will never be dropped.
--Maged
Maged Michael (IBM)