Ensure valid seq is returned in changes feed

When nodes go down whilst continuous replications are running sequences with
incomplete ranges are returned, which creates havoc if they get into chekpoints.
This patch makes a final call to is_progress_possible and errors out if an 
invalid
sequence would be returned.

BugzID: 17240
BugzID: 16415


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/8e72cf76
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/8e72cf76
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/8e72cf76

Branch: refs/heads/import
Commit: 8e72cf7617d4dee3cb676d612c509fc20d463f7d
Parents: 0158038
Author: Bob Dionne <b...@cloudant.com>
Authored: Wed Feb 20 12:51:35 2013 -0500
Committer: Bob Dionne <b...@cloudant.com>
Committed: Thu Feb 21 06:12:49 2013 -0500

----------------------------------------------------------------------
 src/fabric_view.erl         |  3 ++-
 src/fabric_view_changes.erl | 30 ++++++++++++++++++++++++------
 2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8e72cf76/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 63ab3f4..d769f9c 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -31,7 +31,8 @@ remove_down_shards(Collector, BadNode) ->
         {ok, Collector#collector{counters = NewCounters}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
-        Callback({error, Reason}, Acc)
+        Callback({error, Reason}, Acc),
+        {stop, Collector}
     end.
 
 %% @doc looks for a fully covered keyrange in the list of counters

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8e72cf76/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8843562..817bbf7 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -183,16 +183,26 @@ handle_message(#change{key=Seq} = Row0, {Worker, From}, 
St) ->
     _ ->
         S1 = fabric_dict:store(Worker, Seq, S0),
         S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        Row = Row0#change{key = pack_seqs(S2)},
-        {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
-        gen_server:reply(From, Go),
-        {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+        case fabric_view:is_progress_possible(S2) of
+        true ->
+            Row = Row0#change{key = pack_seqs(S2)},
+            {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+            gen_server:reply(From, Go),
+            {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
+        false ->
+            Reason = {range_not_covered, <<"progress not possible">>},
+            Callback({error, Reason}, AccIn),
+            gen_server:reply(From, stop),
+            {stop, St#collector{counters=S2}}
+        end
     end;
 
 handle_message({complete, EndSeq}, Worker, State) ->
     #collector{
+        callback = Callback,
         counters = S0,
-        total_rows = Completed % override
+        total_rows = Completed, % override
+        user_acc = Acc
     } = State,
     case fabric_dict:lookup_element(Worker, S0) of
     undefined ->
@@ -204,7 +214,15 @@ handle_message({complete, EndSeq}, Worker, State) ->
         NewState = State#collector{counters=S2, total_rows=Completed+1},
         case fabric_dict:size(S2) =:= (Completed+1) of
         true ->
-            {stop, NewState};
+            % final check ranges are covered
+            case fabric_view:is_progress_possible(S2) of
+            true ->
+                {stop, NewState};
+            false ->
+                Reason = {range_not_covered, <<"progress not possible">>},
+                Callback({error, Reason}, Acc),
+                {stop, NewState}
+            end;
         false ->
             {ok, NewState}
         end

Reply via email to