Repository: mesos Updated Branches: refs/heads/master b8ada87e8 -> 29e4afdd9
Fixed bug with snapshot positions when writing log diffs. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29e4afdd Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29e4afdd Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29e4afdd Branch: refs/heads/master Commit: 29e4afdd922994ff21b91896949311c5a3a948dc Parents: b8ada87 Author: Benjamin Hindman <[email protected]> Authored: Wed Oct 29 01:27:13 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Wed Oct 29 01:27:16 2014 -0700 ---------------------------------------------------------------------- src/state/log.cpp | 33 +++++++++++++++++++-------------- src/tests/state_tests.cpp | 9 +++++++++ 2 files changed, 28 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/29e4afdd/src/state/log.cpp ---------------------------------------------------------------------- diff --git a/src/state/log.cpp b/src/state/log.cpp index 9033ce4..c0c81d5 100644 --- a/src/state/log.cpp +++ b/src/state/log.cpp @@ -98,7 +98,7 @@ private: Future<bool> ___set( const state::Entry& entry, size_t diff, - const Option<Log::Position>& position); + Option<Log::Position> position); Future<bool> _expunge(const state::Entry& entry); Future<bool> __expunge(const state::Entry& entry); @@ -138,9 +138,8 @@ private: entry(entry), diffs(diffs) {} - Try<Snapshot> patch( - const Log::Position& position, - const Operation::Diff& diff) const + // Returns a snapshot after having applied the specified diff. + Try<Snapshot> patch(const Operation::Diff& diff) const { if (diff.entry().name() != entry.name()) { return Error("Attempted to patch the wrong snapshot"); @@ -310,8 +309,7 @@ Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries) CHECK_SOME(snapshot); - Try<Snapshot> patched = - snapshot.get().patch(entry.position, operation.diff()); + Try<Snapshot> patched = snapshot.get().patch(operation.diff()); if (patched.isError()) { return Failure("Failed to apply the diff: " + patched.error()); @@ -520,24 +518,31 @@ Future<bool> LogStorageProcess::__set( Future<bool> LogStorageProcess::___set( const state::Entry& entry, size_t diffs, - const Option<Log::Position>& position) + Option<Log::Position> position) { if (position.isNone()) { starting = None(); // Reset 'starting' so we try again. return false; } - // Add (or update) the snapshot for this entry and truncate - // the log if possible. - CHECK(!snapshots.contains(entry.name()) || - snapshots.get(entry.name()).get().position < position.get()); + // Update index so we don't bother reading anything before this + // position again (if we don't have to). + index = max(index, position); + + // Determine the position that represents the snapshot: if we just + // wrote a diff then we want to use the existing position of the + // snapshot, otherwise we just overwrote the snapshot so we should + // use the returned position (i.e., do nothing). + if (diffs > 0) { + CHECK(snapshots.contains(entry.name())); + position = snapshots.get(entry.name()).get().position; + } Snapshot snapshot(position.get(), entry, diffs); snapshots.put(snapshot.entry.name(), snapshot); - truncate(); - // Update index so we don't bother with this position again. - index = max(index, position); + // And truncate the log if necessary. + truncate(); return true; } http://git-wip-us.apache.org/repos/asf/mesos/blob/29e4afdd/src/tests/state_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp index f37d606..ef2e96f 100644 --- a/src/tests/state_tests.cpp +++ b/src/tests/state_tests.cpp @@ -643,6 +643,15 @@ TEST_F(LogStateTest, Diff) AWAIT_READY(future2); ASSERT_SOME(future2.get()); + // It's possible that we're doing truncation asynchronously which + // will cause the test to fail because we'll end up getting a + // pending position from Log::Reader::ending which will cause + // Log::Reader::read to fail. To remedy this, we pause the clock and + // wait for all executing processe to settle. + Clock::pause(); + Clock::settle(); + Clock::resume(); + Log::Reader reader(log); Future<Log::Position> beginning = reader.beginning();
