This is an automated email from the ASF dual-hosted git repository.

iilyak pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/3.x by this push:
     new 5ed8d54  Add smoosh queue persistence
     new c9a95b0  Merge pull request #3766 from 
noahshaw11/add-smoosh-queue-persistence
5ed8d54 is described below

commit 5ed8d54bd834a43a41eba11524171217f141e0cb
Author: ncshaw <[email protected]>
AuthorDate: Mon Nov 29 18:04:59 2021 -0500

    Add smoosh queue persistence
---
 configure                                       |   1 +
 dev/run                                         |   3 +
 rel/overlay/etc/default.ini                     |   3 +
 rel/plugins/eunit_plugin.erl                    |   4 +-
 setup_eunit.template                            |   3 +-
 src/couch/src/couch_bt_engine.erl               |  10 +
 src/couch/src/couch_db.erl                      |   4 +
 src/couch/src/couch_db_engine.erl               |   4 +
 src/couch/src/couch_server.erl                  |   9 +
 src/smoosh/operator_guide.md                    |  20 ++
 src/smoosh/src/smoosh.app.src                   |  33 ++-
 src/smoosh/src/smoosh_channel.erl               | 254 ++++++++++++++++++++----
 src/smoosh/src/smoosh_priority_queue.erl        | 122 ++++++++++--
 src/smoosh/src/smoosh_server.erl                |  24 ++-
 src/smoosh/src/smoosh_utils.erl                 |  31 ++-
 src/smoosh/test/smoosh_priority_queue_tests.erl | 167 ++++++++++++++++
 src/smoosh/test/smoosh_tests.erl                | 130 ++++++++++++
 17 files changed, 733 insertions(+), 89 deletions(-)

diff --git a/configure b/configure
index 0bcbfae..d8e592b 100755
--- a/configure
+++ b/configure
@@ -250,6 +250,7 @@ cat > rel/couchdb.config << EOF
 {prefix, "."}.
 {data_dir, "./data"}.
 {view_index_dir, "./data"}.
+{state_dir, "./data"}.
 {log_file, "$LOG_FILE"}.
 {fauxton_root, "./share/www"}.
 {user, "$COUCHDB_USER"}.
diff --git a/dev/run b/dev/run
index 3ca67a1..05ed16a 100755
--- a/dev/run
+++ b/dev/run
@@ -299,6 +299,9 @@ def setup_configs(ctx):
             "view_index_dir": toposixpath(
                 ensure_dir_exists(ctx["devdir"], "lib", node, "data")
             ),
+            "state_dir": toposixpath(
+                ensure_dir_exists(ctx["devdir"], "lib", node, "data")
+            ),
             "node_name": "-name %[email protected]" % node,
             "cluster_port": cluster_port,
             "backend_port": backend_port,
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 6b64c6d..5def66c 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -643,6 +643,9 @@ partitioned||* = true
 ;[smoosh.slack_views]
 ;priority = slack
 ;min_priority = 536870912
+;
+; Directory to store the state of smoosh
+state_dir = {{state_dir}}
 
 [ioq]
 ; The maximum number of concurrent in-flight IO requests that
diff --git a/rel/plugins/eunit_plugin.erl b/rel/plugins/eunit_plugin.erl
index 69003ab..8f298db 100644
--- a/rel/plugins/eunit_plugin.erl
+++ b/rel/plugins/eunit_plugin.erl
@@ -29,12 +29,14 @@ build_eunit_config(Config0, AppFile) ->
     Cwd = filename:absname(rebar_utils:get_cwd()),
     DataDir = Cwd ++ "/tmp/data",
     ViewIndexDir = Cwd ++ "/tmp/data",
+    StateDir = Cwd ++ "/tmp/data",
     TmpDataDir = Cwd ++ "/tmp/tmp_data",
     cleanup_dirs([DataDir, TmpDataDir]),
     Config1 = rebar_config:set_global(Config0, template, "setup_eunit"),
     Config2 = rebar_config:set_global(Config1, prefix, Cwd),
     Config3 = rebar_config:set_global(Config2, data_dir, DataDir),
-    Config = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir),
+    Config4 = rebar_config:set_global(Config3, view_index_dir, ViewIndexDir),
+    Config = rebar_config:set_global(Config4, state_dir, StateDir),
     rebar_templater:create(Config, AppFile).
 
 cleanup_dirs(Dirs) ->
diff --git a/setup_eunit.template b/setup_eunit.template
index 3625441..ceef60d 100644
--- a/setup_eunit.template
+++ b/setup_eunit.template
@@ -7,7 +7,8 @@
 
     {data_dir, "/tmp"},
     {prefix, "/tmp"},
-    {view_index_dir, "/tmp"}
+    {view_index_dir, "/tmp"},
+    {state_dir, "/tmp"}
 ]}.
 {dir, "tmp"}.
 {dir, "tmp/etc"}.
diff --git a/src/couch/src/couch_bt_engine.erl 
b/src/couch/src/couch_bt_engine.erl
index 7d23905..42dbed2 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -19,6 +19,8 @@
     delete/3,
     delete_compaction_files/3,
 
+    is_compacting/1,
+
     init/2,
     terminate/2,
     handle_db_updater_call/2,
@@ -139,6 +141,14 @@ delete_compaction_files(RootDir, FilePath, DelOpts) ->
         [".compact", ".compact.data", ".compact.meta"]
     ).
 
+is_compacting(DbName) ->
+    lists:any(
+        fun(Ext) ->
+            filelib:is_regular(DbName ++ Ext)
+        end,
+        [".compact", ".compact.data", ".compact.meta"]
+    ).
+
 init(FilePath, Options) ->
     {ok, Fd} = open_db_file(FilePath, Options),
     Header =
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index fa003e1..cafbb0d 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -120,6 +120,7 @@
     cancel_compact/1,
     wait_for_compaction/1,
     wait_for_compaction/2,
+    is_compacting/1,
 
     dbname_suffix/1,
     normalize_dbname/1,
@@ -277,6 +278,9 @@ wait_for_compaction(#db{main_pid = Pid} = Db, Timeout) ->
             ok
     end.
 
+is_compacting(DbName) ->
+    couch_server:is_compacting(DbName).
+
 delete_doc(Db, Id, Revisions) ->
     DeletedDocs = [#doc{id = Id, revs = [Rev], deleted = true} || Rev <- 
Revisions],
     {ok, [Result]} = update_docs(Db, DeletedDocs, []),
diff --git a/src/couch/src/couch_db_engine.erl 
b/src/couch/src/couch_db_engine.erl
index de4a424..9e46b81 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -662,6 +662,7 @@
     exists/2,
     delete/4,
     delete_compaction_files/4,
+    is_compacting/2,
 
     init/3,
     terminate/2,
@@ -736,6 +737,9 @@ delete_compaction_files(Engine, RootDir, DbPath, DelOpts) 
when
 ->
     Engine:delete_compaction_files(RootDir, DbPath, DelOpts).
 
+is_compacting(Engine, DbName) ->
+    Engine:is_compacting(DbName).
+
 init(Engine, DbPath, Options) ->
     case Engine:init(DbPath, Options) of
         {ok, EngineState} ->
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 5e29538..7421789 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -22,6 +22,7 @@
 -export([dev_start/0, is_admin/2, has_admins/0, get_stats/0]).
 -export([close_db_if_idle/1]).
 -export([delete_compaction_files/1]).
+-export([is_compacting/1]).
 -export([exists/1]).
 -export([get_engine_extensions/0]).
 -export([get_engine_path/2]).
@@ -183,6 +184,14 @@ delete_compaction_files(DbName, DelOpts) when 
is_list(DbName) ->
 delete_compaction_files(DbName, DelOpts) when is_binary(DbName) ->
     delete_compaction_files(?b2l(DbName), DelOpts).
 
+is_compacting(DbName) ->
+    lists:any(
+        fun({_, Engine}) ->
+            couch_db_engine:is_compacting(Engine, DbName)
+        end,
+        get_configured_engines()
+    ).
+
 maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
     maybe_add_sys_db_callbacks(?b2l(DbName), Options);
 maybe_add_sys_db_callbacks(DbName, Options) ->
diff --git a/src/smoosh/operator_guide.md b/src/smoosh/operator_guide.md
index afe08f8..3dd8cdc 100644
--- a/src/smoosh/operator_guide.md
+++ b/src/smoosh/operator_guide.md
@@ -107,7 +107,27 @@ However, it's the best measure we currently have.
 
 [Even more 
info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value).
 
+#### State diagram
 
+```
+stateDiagram
+    [*] --> init
+    init --> start_recovery: send_after(?START_DELAY_IN_MSEC, self(), 
start_recovery)
+    note right of start_recovery
+        activated = false
+        paused = true
+    end note
+    start_recovery --> activate: send_after(?ACTIVATE_DELAY_IN_MSEC, self(), 
activate)
+    note right of activate
+        state has been recovered
+        activated = true
+        paused = true
+    end note
+    activate --> schedule_unpause
+    schedule_unpause --> [*]: after 30 sec, paused = false and compaction of 
new jobs begin
+```
+
+See 
[here](https://mermaid.ink/img/pako:eNqNUtFKwzAU_ZVLnjbpQPCtoFK2PgzUB1sEsVKy5LaNtklJ08kY-3eTtKVsTDBPybnnnntOkiNhiiMJSWeowY2gpaZNJsGuj5tPWK0eQEhhBsTtPGTJ2uQamdqjPoTQoeQ5LQzqxWOSRq9pvomfovd8-5I_J_E6sIS6WCyDi8blICuVQdCirAyo4oIyMNyizIi99cjhHgpadziXWtp3Hje6H2FryOsOp3NNH2GSOzcfrdPtW5TGf_mfuq46n4qzMX-pUNEOdogSRgPIr6ea3f8r1NQ6vAirkPc15r30jWPuC9RT7buG4PPC3a1NxoJ5lr9YoHYOU03rJijpokn8gS-1czlKIUlAGtQNFdx-nKMblRFTYYMZCe2WU_2dkUyeLK9vubUYc2GUJqGXDwjtjUoOkpHQZZtI4-cbWadfC0TavA)
 for a diagram of smo [...]
 ### Defining a channel
 
 Defining a channel is done via normal dbcore configuration, with some
diff --git a/src/smoosh/src/smoosh.app.src b/src/smoosh/src/smoosh.app.src
index a6cdb7f..4549c66 100644
--- a/src/smoosh/src/smoosh.app.src
+++ b/src/smoosh/src/smoosh.app.src
@@ -10,20 +10,19 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
-{application, smoosh,
- [
-  {description, "Auto-compaction daemon"},
-  {vsn, git},
-  {registered, [smoosh_server]},
-  {applications, [
-                  kernel,
-                  stdlib,
-                  couch_log,
-                  config,
-                  couch_event,
-                  couch,
-                  mem3
-                 ]},
-  {mod, { smoosh_app, []}},
-  {env, []}
- ]}.
+{application, smoosh, [
+    {description, "Auto-compaction daemon"},
+    {vsn, git},
+    {registered, [smoosh_server]},
+    {applications, [
+        kernel,
+        stdlib,
+        couch_log,
+        config,
+        couch_event,
+        couch,
+        mem3
+    ]},
+    {mod, {smoosh_app, []}},
+    {env, []}
+]}.
diff --git a/src/smoosh/src/smoosh_channel.erl 
b/src/smoosh/src/smoosh_channel.erl
index 06849ac..064f18a 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -16,8 +16,8 @@
 -include_lib("couch/include/couch_db.hrl").
 
 % public api.
--export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
--export([enqueue/3, last_updated/2, flush/1]).
+-export([start_link/1, close/1, suspend/1, resume/1, activate/1, 
get_status/1]).
+-export([enqueue/3, last_updated/2, flush/1, is_key/2, is_activated/1, 
persist/1]).
 
 % gen_server api.
 -export([
@@ -25,18 +25,38 @@
     handle_call/3,
     handle_cast/2,
     handle_info/2,
-    code_change/3,
     terminate/2
 ]).
 
+-define(VSN, 1).
+-define(CHECKPOINT_INTERVAL_IN_MSEC, 180000).
+
+-ifndef(TEST).
+-define(START_DELAY_IN_MSEC, 60000).
+-define(ACTIVATE_DELAY_IN_MSEC, 30000).
+-else.
+-define(START_DELAY_IN_MSEC, 0).
+-define(ACTIVATE_DELAY_IN_MSEC, 0).
+-endif.
+
 % records.
 
+% When the state is set to activated = true, the channel has completed the 
state
+% recovery process that occurs on (re)start and is accepting new compaction 
jobs.
+% Note: if activated = false and a request for a new compaction job is 
received,
+% smoosh will enqueue this new job after the state recovery process has 
finished.
+% When the state is set to paused = false, the channel is actively compacting 
any
+% compaction jobs that are scheduled.
+% See operator_guide.md --> State diagram.
+
 -record(state, {
     active = [],
     name,
-    waiting = smoosh_priority_queue:new(),
+    waiting,
     paused = true,
-    starting = []
+    starting = [],
+    activated = false,
+    requests = []
 }).
 
 % public functions.
@@ -48,7 +68,10 @@ suspend(ServerRef) ->
     gen_server:call(ServerRef, suspend).
 
 resume(ServerRef) ->
-    gen_server:call(ServerRef, resume).
+    gen_server:call(ServerRef, resume_and_activate).
+
+activate(ServerRef) ->
+    gen_server:call(ServerRef, activate).
 
 enqueue(ServerRef, Object, Priority) ->
     gen_server:cast(ServerRef, {enqueue, Object, Priority}).
@@ -65,32 +88,42 @@ close(ServerRef) ->
 flush(ServerRef) ->
     gen_server:call(ServerRef, flush).
 
+is_key(ServerRef, Key) ->
+    gen_server:call(ServerRef, {is_key, Key}).
+
+is_activated(ServerRef) ->
+    gen_server:call(ServerRef, is_activated).
+
+persist(ServerRef) ->
+    gen_server:call(ServerRef, persist).
+
 % gen_server functions.
 
 init(Name) ->
-    schedule_unpause(),
     erlang:send_after(60 * 1000, self(), check_window),
-    {ok, #state{name = Name}}.
+    process_flag(trap_exit, true),
+    Waiting = smoosh_priority_queue:new(Name),
+    State = #state{name = Name, waiting = Waiting, paused = true, activated = 
false},
+    erlang:send_after(?START_DELAY_IN_MSEC, self(), start_recovery),
+    {ok, State}.
 
-handle_call({last_updated, Object}, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_call({last_updated, Object}, _From, State) ->
     LastUpdated = smoosh_priority_queue:last_updated(Object, 
State#state.waiting),
     {reply, LastUpdated, State};
-handle_call(suspend, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_call(suspend, _From, State) ->
     #state{active = Active} = State,
     [
         catch erlang:suspend_process(Pid, [unless_suspending])
      || {_, Pid} <- Active
     ],
     {reply, ok, State#state{paused = true}};
-handle_call(resume, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_call(resume_and_activate, _From, State) ->
     #state{active = Active} = State,
     [catch erlang:resume_process(Pid) || {_, Pid} <- Active],
-    {reply, ok, State#state{paused = false}};
-handle_call(status, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+    {reply, ok, State#state{paused = false, activated = true}};
+handle_call(activate, _From, State) ->
+    {reply, ok, State#state{activated = true}};
+handle_call(status, _From, State) ->
     {reply,
         {ok, [
             {active, length(State#state.active)},
@@ -98,27 +131,38 @@ handle_call(status, _From, State0) ->
             {waiting, smoosh_priority_queue:info(State#state.waiting)}
         ]},
         State};
-handle_call(close, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_call(close, _From, State) ->
     {stop, normal, ok, State};
-handle_call(flush, _From, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
-    {reply, ok, State#state{waiting = smoosh_priority_queue:new()}}.
+handle_call(flush, _From, #state{waiting = Q} = State) ->
+    {reply, ok, State#state{waiting = smoosh_priority_queue:flush(Q)}};
+handle_call({is_key, Key}, _From, #state{waiting = Waiting} = State) ->
+    {reply, smoosh_priority_queue:is_key(Key, Waiting), State};
+handle_call(is_activated, _From, #state{activated = Activated} = State0) ->
+    {reply, Activated, State0};
+handle_call(persist, _From, State) ->
+    persist_queue(State),
+    {reply, ok, State}.
 
-handle_cast({enqueue, _Object, 0}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_cast({enqueue, _Object, 0}, #state{} = State) ->
     {noreply, State};
-handle_cast({enqueue, Object, Priority}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
-    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
+handle_cast({enqueue, Object, Priority}, #state{activated = true} = State) ->
+    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))};
+handle_cast({enqueue, Object, Priority}, #state{activated = false, requests = 
Requests} = State0) ->
+    couch_log:notice(
+        "~p Channel is not activated yet. Adding ~p to requests with priority 
~p.", [
+            ?MODULE,
+            Object,
+            Priority
+        ]
+    ),
+    {noreply, State0#state{requests = [{Object, Priority} | Requests]}}.
 
 % We accept noproc here due to possibly having monitored a restarted compaction
 % pid after it finished.
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
+handle_info({'DOWN', Ref, _, Job, Reason}, State) when
     Reason == normal;
     Reason == noproc
 ->
-    {ok, State} = code_change(nil, State0, nil),
     #state{active = Active, starting = Starting} = State,
     {noreply,
         maybe_start_compaction(
@@ -127,8 +171,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) when
                 starting = lists:keydelete(Ref, 1, Starting)
             }
         )};
-handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_info({'DOWN', Ref, _, Job, Reason}, State) ->
     #state{active = Active0, starting = Starting0} = State,
     case lists:keytake(Job, 2, Active0) of
         {value, {Key, _Pid}, Active1} ->
@@ -142,7 +185,8 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
             case lists:keytake(Ref, 1, Starting0) of
                 {value, {_, Key}, Starting1} ->
                     couch_log:warning("failed to start compaction of ~p: ~p", [
-                        smoosh_utils:stringify(Key), Reason
+                        smoosh_utils:stringify(Key),
+                        Reason
                     ]),
                     {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, 
[Key]),
                     {noreply, maybe_start_compaction(State#state{starting = 
Starting1})};
@@ -150,8 +194,7 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
                     {noreply, State}
             end
     end;
-handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_info({Ref, {ok, Pid}}, State) when is_reference(Ref) ->
     case lists:keytake(Ref, 1, State#state.starting) of
         {value, {_, Key}, Starting1} ->
             couch_log:notice(
@@ -167,8 +210,7 @@ handle_info({Ref, {ok, Pid}}, State0) when 
is_reference(Ref) ->
         false ->
             {noreply, State}
     end;
-handle_info(check_window, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_info(check_window, State) ->
     #state{paused = Paused, name = Name} = State,
     StrictWindow = smoosh_utils:get(Name, "strict_window", "false"),
     FinalState =
@@ -194,18 +236,113 @@ handle_info(check_window, State0) ->
         end,
     erlang:send_after(60 * 1000, self(), check_window),
     {noreply, FinalState};
-handle_info(pause, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_info(start_recovery, #state{name = Name, waiting = Waiting0} = State0) 
->
+    RecActive = recover(active_file_name(Name)),
+    Waiting1 = lists:foldl(
+        fun(DbName, Acc) ->
+            case couch_db:is_compacting(DbName) of
+                true ->
+                    Priority = smoosh_server:get_priority(Name, DbName),
+                    smoosh_priority_queue:in(DbName, Priority, Priority, Acc);
+                false ->
+                    Acc
+            end
+        end,
+        Waiting0,
+        RecActive
+    ),
+    State1 = maybe_start_compaction(State0#state{paused = false, waiting = 
Waiting1}),
+    couch_log:notice(
+        "~p Previously active compaction jobs (if any) have been successfully 
recovered and restarted.",
+        [?MODULE]
+    ),
+    erlang:send_after(?ACTIVATE_DELAY_IN_MSEC, self(), activate),
+    {noreply, State1#state{paused = true}};
+handle_info(activate, State) ->
+    {noreply, activate_channel(State)};
+handle_info(persist, State) ->
+    persist_queue(State),
+    erlang:send_after(?CHECKPOINT_INTERVAL_IN_MSEC, self(), persist),
+    {noreply, State};
+handle_info(pause, State) ->
     {noreply, State#state{paused = true}};
-handle_info(unpause, State0) ->
-    {ok, State} = code_change(nil, State0, nil),
+handle_info(unpause, State) ->
     {noreply, maybe_start_compaction(State#state{paused = false})}.
 
 terminate(_Reason, _State) ->
     ok.
 
-code_change(_OldVsn, #state{} = State, _Extra) ->
-    {ok, State}.
+persist_queue(State) ->
+    write_state_to_file(State).
+
+recover(FilePath) ->
+    case do_recover(FilePath) of
+        {ok, List} ->
+            List;
+        error ->
+            []
+    end.
+
+do_recover(FilePath) ->
+    case file:read_file(FilePath) of
+        {ok, Content} ->
+            <<Vsn, Binary/binary>> = Content,
+            try parse_state(Vsn, ?VSN, Binary) of
+                Term ->
+                    couch_log:notice(
+                        "~p Successfully restored state file ~s", [?MODULE, 
FilePath]
+                    ),
+                    {ok, Term}
+            catch
+                error:Reason ->
+                    couch_log:error(
+                        "~p Invalid state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
+                    ),
+                    file:delete(FilePath),
+                    error
+            end;
+        {error, enoent} ->
+            couch_log:notice(
+                "~p (~p) State file ~s does not exist. Not restoring.", 
[?MODULE, enoent, FilePath]
+            ),
+            error;
+        {error, Reason} ->
+            couch_log:error(
+                "~p Cannot read the state file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
+            ),
+            file:delete(FilePath),
+            error
+    end.
+
+parse_state(1, ?VSN, Binary) ->
+    erlang:binary_to_term(Binary, [safe]);
+parse_state(Vsn, ?VSN, _) ->
+    error({unsupported_version, Vsn}).
+
+write_state_to_file(#state{name = Name, active = Active, starting = Starting, 
waiting = Waiting}) ->
+    Active1 = lists:foldl(
+        fun({DbName, _}, Acc) ->
+            [DbName | Acc]
+        end,
+        [],
+        Active
+    ),
+    Starting1 = lists:foldl(
+        fun({_, DbName}, Acc) ->
+            [DbName | Acc]
+        end,
+        [],
+        Starting
+    ),
+    smoosh_utils:write_to_file(Active1, active_file_name(Name), ?VSN),
+    smoosh_utils:write_to_file(Starting1, starting_file_name(Name), ?VSN),
+    smoosh_priority_queue:write_to_file(Waiting).
+
+active_file_name(Name) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".active").
+
+starting_file_name(Name) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".starting").
 
 % private functions.
 
@@ -225,6 +362,37 @@ add_to_queue(Key, Priority, State) ->
             }
     end.
 
+maybe_activate(#state{activated = true} = State) ->
+    State;
+maybe_activate(State) ->
+    activate_channel(State).
+
+activate_channel(#state{name = Name, waiting = Waiting0, requests = Requests0} 
= State0) ->
+    RecStarting = recover(starting_file_name(Name)),
+    Starting = lists:foldl(
+        fun(DbName, Acc) ->
+            Priority = smoosh_server:get_priority(Name, DbName),
+            smoosh_priority_queue:in(DbName, Priority, Priority, Acc)
+        end,
+        Waiting0,
+        RecStarting
+    ),
+    Waiting1 = smoosh_priority_queue:recover(Starting),
+    Requests1 = lists:reverse(Requests0),
+    Waiting2 = lists:foldl(
+        fun({DbName, Priority}, Acc) ->
+            smoosh_priority_queue:in(DbName, Priority, Priority, Acc)
+        end,
+        Waiting1,
+        Requests1
+    ),
+    State1 = maybe_start_compaction(State0#state{
+        waiting = Waiting2, paused = false, activated = true, requests = []
+    }),
+    handle_info(persist, State1),
+    schedule_unpause(),
+    State1#state{paused = true}.
+
 maybe_start_compaction(#state{paused = true} = State) ->
     State;
 maybe_start_compaction(State) ->
@@ -239,7 +407,7 @@ maybe_start_compaction(State) ->
         length(State#state.active) + length(State#state.starting) < 
Concurrency ->
             case smoosh_priority_queue:out(State#state.waiting) of
                 false ->
-                    State;
+                    maybe_activate(State);
                 {Key, Priority, Q} ->
                     try
                         State2 =
diff --git a/src/smoosh/src/smoosh_priority_queue.erl 
b/src/smoosh/src/smoosh_priority_queue.erl
index b6f4b6d..83c9366 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,33 +12,68 @@
 
 -module(smoosh_priority_queue).
 
--export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+-export([new/1, recover/1]).
+
+-export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-export([flush/1]).
+
+-export([from_list/2, to_list/1]).
+
+-export([is_empty/1]).
+
+-export([file_name/1, write_to_file/1]).
+
+-define(VSN, 1).
 
 -record(priority_queue, {
-    dict = dict:new(),
-    tree = gb_trees:empty()
+    name,
+    map,
+    tree
 }).
 
-new() ->
-    #priority_queue{}.
+new(Name) ->
+    #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
+
+recover(#priority_queue{name = Name, map = Map0} = Q) ->
+    case do_recover(file_name(Q)) of
+        {ok, Terms} ->
+            Map = maps:merge(Map0, Terms),
+            Tree = maps:fold(
+                fun(Key, {TreeKey, Value}, TreeAcc) ->
+                    gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
+                end,
+                gb_trees:empty(),
+                Map
+            ),
+            #priority_queue{name = Name, map = Map, tree = Tree};
+        error ->
+            Q
+    end.
+
+write_to_file(#priority_queue{map = Map} = Q) ->
+    smoosh_utils:write_to_file(Map, file_name(Q), ?VSN).
+
+flush(#priority_queue{name = Name} = Q) ->
+    Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
 
-last_updated(Key, #priority_queue{dict = Dict}) ->
-    case dict:find(Key, Dict) of
+last_updated(Key, #priority_queue{map = Map}) ->
+    case maps:find(Key, Map) of
         {ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
             LastUpdatedMTime;
         error ->
             false
     end.
 
-is_key(Key, #priority_queue{dict = Dict}) ->
-    dict:is_key(Key, Dict).
+is_key(Key, #priority_queue{map = Map}) ->
+    maps:is_key(Key, Map).
 
 in(Key, Value, Priority, Q) ->
     in(Key, Value, Priority, infinity, Q).
 
-in(Key, Value, Priority, Capacity, #priority_queue{dict = Dict, tree = Tree}) 
->
+in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, 
tree = Tree}) ->
     Tree1 =
-        case dict:find(Key, Dict) of
+        case maps:find(Key, Map) of
             {ok, TreeKey} ->
                 gb_trees:delete_any(TreeKey, Tree);
             error ->
@@ -47,17 +82,18 @@ in(Key, Value, Priority, Capacity, #priority_queue{dict = 
Dict, tree = Tree}) ->
     Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
     TreeKey1 = {Priority, Now},
     Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
-    Dict1 = dict:store(Key, TreeKey1, Dict),
-    truncate(Capacity, #priority_queue{dict = Dict1, tree = Tree2}).
+    Map1 = maps:put(Key, TreeKey1, Map),
+    truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}).
 
-out(#priority_queue{dict = Dict, tree = Tree}) ->
+out(#priority_queue{name = Name, map = Map, tree = Tree}) ->
     case gb_trees:is_empty(Tree) of
         true ->
             false;
         false ->
             {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
-            Dict1 = dict:erase(Key, Dict),
-            {Key, Value, #priority_queue{dict = Dict1, tree = Tree1}}
+            Map1 = maps:remove(Key, Map),
+            Q = #priority_queue{name = Name, map = Map1, tree = Tree1},
+            {Key, Value, Q}
     end.
 
 size(#priority_queue{tree = Tree}) ->
@@ -76,6 +112,20 @@ info(#priority_queue{tree = Tree} = Q) ->
         end
     ].
 
+from_list(Orddict, #priority_queue{name = Name}) ->
+    Map = maps:from_list(Orddict),
+    Tree = gb_trees:from_orddict(Orddict),
+    #priority_queue{name = Name, map = Map, tree = Tree}.
+
+to_list(#priority_queue{tree = Tree}) ->
+    gb_trees:to_list(Tree).
+
+is_empty(#priority_queue{tree = Tree}) ->
+    gb_trees:is_empty(Tree).
+
+file_name(#priority_queue{name = Name}) ->
+    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting").
+
 truncate(infinity, Q) ->
     Q;
 truncate(Capacity, Q) when Capacity > 0 ->
@@ -83,7 +133,43 @@ truncate(Capacity, Q) when Capacity > 0 ->
 
 truncate(Capacity, Size, Q) when Size =< Capacity ->
     Q;
-truncate(Capacity, Size, #priority_queue{dict = Dict, tree = Tree}) when Size 
> 0 ->
+truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) 
when Size > 0 ->
     {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
-    Q1 = #priority_queue{dict = dict:erase(Key, Dict), tree = Tree1},
+    Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = 
Tree1},
     truncate(Capacity, ?MODULE:size(Q1), Q1).
+
+do_recover(FilePath) ->
+    case file:read_file(FilePath) of
+        {ok, Content} ->
+            <<Vsn, Binary/binary>> = Content,
+            try parse_queue(Vsn, ?VSN, Binary) of
+                Bin ->
+                    couch_log:notice(
+                        "~p Successfully restored state file ~s", [?MODULE, 
FilePath]
+                    ),
+                    {ok, Bin}
+            catch
+                error:Reason ->
+                    couch_log:error(
+                        "~p Invalid queue file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
+                    ),
+                    file:delete(FilePath),
+                    error
+            end;
+        {error, enoent} ->
+            couch_log:notice(
+                "~p (~p) Queue file ~s does not exist. Not restoring.", 
[?MODULE, enoent, FilePath]
+            ),
+            error;
+        {error, Reason} ->
+            couch_log:error(
+                "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, 
Reason, FilePath]
+            ),
+            file:delete(FilePath),
+            error
+    end.
+
+parse_queue(1, ?VSN, Binary) ->
+    erlang:binary_to_term(Binary, [safe]);
+parse_queue(Vsn, ?VSN, _) ->
+    error({unsupported_version, Vsn}).
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 5529e93..e9823c3 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -45,6 +45,10 @@
 
 % exported but for internal use.
 -export([enqueue_request/2]).
+-export([get_priority/2]).
+
+% exported for testing and debugging
+-export([get_channel/1]).
 
 -ifdef(TEST).
 -define(RELISTEN_DELAY, 50).
@@ -60,7 +64,7 @@
     schema_channels = [],
     tab,
     event_listener,
-    waiting = dict:new()
+    waiting = maps:new()
 }).
 
 -record(channel, {
@@ -109,6 +113,10 @@ handle_db_event(DbName, {schema_updated, DDocId}, St) ->
 handle_db_event(_DbName, _Event, St) ->
     {ok, St}.
 
+% for testing and debugging only
+get_channel(ChannelName) ->
+    gen_server:call(?MODULE, {get_channel, ChannelName}).
+
 % gen_server functions.
 
 init([]) ->
@@ -181,7 +189,9 @@ handle_call(resume, _From, State) ->
         0,
         State#state.tab
     ),
-    {reply, ok, State}.
+    {reply, ok, State};
+handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) ->
+    {reply, {ok, channel_pid(Tab, ChannelName)}, State}.
 
 handle_cast({new_db_channels, Channels}, State) ->
     [
@@ -203,12 +213,12 @@ handle_cast({new_schema_channels, Channels}, State) ->
     {noreply, create_missing_channels(State#state{view_channels = Channels})};
 handle_cast({enqueue, Object}, State) ->
     #state{waiting = Waiting} = State,
-    case dict:is_key(Object, Waiting) of
+    case maps:is_key(Object, Waiting) of
         true ->
             {noreply, State};
         false ->
             {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, 
Object]),
-            {noreply, State#state{waiting = dict:store(Object, Ref, Waiting)}}
+            {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
     end.
 
 handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -225,7 +235,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
     end,
     {noreply, create_missing_channels(State)};
 handle_info({'DOWN', Ref, _, _, _}, State) ->
-    Waiting = dict:filter(
+    Waiting = maps:filter(
         fun(_Key, Value) -> Value =/= Ref end,
         State#state.waiting
     ),
@@ -306,7 +316,9 @@ find_channel(Tab, [Channel | Rest], Object) ->
             ?SECONDS_PER_MINUTE,
     Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native),
     Now = erlang:monotonic_time(),
-    case LastUpdated =:= false orelse Now - LastUpdated > Staleness of
+    Activated = smoosh_channel:is_activated(Pid),
+    StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness,
+    case Activated andalso StaleEnough of
         true ->
             case smoosh_utils:ignore_db(Object) of
                 true ->
diff --git a/src/smoosh/src/smoosh_utils.erl b/src/smoosh/src/smoosh_utils.erl
index 882d3ec..625a9e4 100644
--- a/src/smoosh/src/smoosh_utils.erl
+++ b/src/smoosh/src/smoosh_utils.erl
@@ -14,9 +14,7 @@
 -include_lib("couch/include/couch_db.hrl").
 
 -export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]).
--export([
-    in_allowed_window/1
-]).
+-export([in_allowed_window/1, write_to_file/3]).
 
 group_pid({Shard, GroupId}) ->
     case couch_view_group:open_db_group(Shard, GroupId) of
@@ -75,6 +73,33 @@ in_allowed_window(From, To) ->
             ({HH, MM} >= From) orelse ({HH, MM} < To)
     end.
 
+file_delete(Path) ->
+    case file:delete(Path) of
+        Ret when Ret =:= ok; Ret =:= {error, enoent} ->
+            ok;
+        Error ->
+            Error
+    end.
+
+throw_on_error(_Args, ok) ->
+    ok;
+throw_on_error(Args, {error, Reason}) ->
+    throw({error, {Reason, Args}}).
+
+write_to_file(Content, FileName, VSN) ->
+    couch_log:notice("~p Writing state to state file ~s", [?MODULE, FileName]),
+    OnDisk = <<VSN, (erlang:term_to_binary(Content, [compressed, 
{minor_version, 1}]))/binary>>,
+    TmpFileName = FileName ++ ".tmp",
+    try
+        throw_on_error(TmpFileName, file_delete(TmpFileName)),
+        throw_on_error(TmpFileName, file:write_file(TmpFileName, OnDisk, 
[sync])),
+        throw_on_error(FileName, file_delete(FileName)),
+        throw_on_error([TmpFileName, FileName], file:rename(TmpFileName, 
FileName))
+    catch
+        throw:Error ->
+            Error
+    end.
+
 parse_time(undefined, Default) ->
     Default;
 parse_time(String, Default) ->
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl 
b/src/smoosh/test/smoosh_priority_queue_tests.erl
new file mode 100644
index 0000000..289804c
--- /dev/null
+++ b/src/smoosh/test/smoosh_priority_queue_tests.erl
@@ -0,0 +1,167 @@
+-module(smoosh_priority_queue_tests).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(PROP_PREFIX, "prop_").
+
+-define(CAPACITY, 3).
+
+-define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", 
[erlang:timestamp()]))).
+
+setup() ->
+    Ctx = test_util:start_couch(),
+    Ctx.
+
+teardown(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+smoosh_priority_queue_test_() ->
+    {
+        "smoosh priority queue test",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun prop_inverse_test_/0,
+                fun no_halt_on_corrupted_file_test/0,
+                fun no_halt_on_missing_file_test/0
+            ]
+        }
+    }.
+
+%% ==========
+%% Tests
+%% ----------
+
+%% define all tests to be able to run them individually
+prop_inverse_test_() ->
+    ?_test(begin
+        test_property(prop_inverse)
+    end).
+
+no_halt_on_corrupted_file_test() ->
+    ?_test(begin
+        Name = ?RANDOM_CHANNEL,
+        Q = smoosh_priority_queue:new(Name),
+        FilePath = smoosh_priority_queue:file_name(Q),
+        ok = file:write_file(FilePath, <<"garbage">>),
+        ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
+        ok
+    end).
+
+no_halt_on_missing_file_test() ->
+    ?_test(begin
+        Name = ?RANDOM_CHANNEL,
+        Q = smoosh_priority_queue:new(Name),
+        FilePath = smoosh_priority_queue:file_name(Q),
+        ok = file:delete(FilePath),
+        ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
+        ok
+    end).
+
+%% ==========
+%% Properties
+%% ----------
+
+prop_inverse() ->
+    ?FORALL(
+        Q,
+        queue(),
+        begin
+            List = smoosh_priority_queue:to_list(Q),
+            equal(Q, smoosh_priority_queue:from_list(List, Q))
+        end
+    ).
+
+%% ==========
+%% Generators
+%% ----------
+
+key() ->
+    proper_types:oneof([proper_types:binary(), {proper_types:binary(), 
proper_types:binary()}]).
+value() ->
+    proper_types:oneof([proper_types:binary(), {proper_types:binary(), 
proper_types:binary()}]).
+priority() -> integer().
+item() -> {key(), value(), priority()}.
+
+items_list() ->
+    ?LET(L, list(item()), L).
+
+simple_queue() ->
+    ?LET(
+        L,
+        items_list(),
+        from_list(L)
+    ).
+
+with_deleted() ->
+    ?LET(
+        Q,
+        ?LET(
+            {{K0, V0, P0}, Q0},
+            {item(), simple_queue()},
+            smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)
+        ),
+        frequency([
+            {1, Q},
+            {2, element(3, smoosh_priority_queue:out(Q))}
+        ])
+    ).
+
+queue() ->
+    with_deleted().
+
+%% ==========================
+%% Proper related boilerplate
+%% --------------------------
+
+test_property(Property) when is_atom(Property) ->
+    test_property({atom_to_list(Property), Property});
+test_property({Id, Property}) ->
+    Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
+    Opts = [long_result, {numtests, 1000}, {to_file, user}],
+    {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
+
+test_it(Property, Opts) ->
+    case proper:quickcheck(?MODULE:Property(), Opts) of
+        true ->
+            true;
+        Else ->
+            erlang:error(
+                {propertyFailed, [
+                    {module, ?MODULE},
+                    {property, Property},
+                    {result, Else}
+                ]}
+            )
+    end.
+
+%% ================
+%% Helper functions
+%% ----------------
+
+new() ->
+    Q = smoosh_priority_queue:new("foo"),
+    smoosh_priority_queue:recover(Q).
+
+from_list(List) ->
+    lists:foldl(
+        fun({Key, Value, Priority}, Queue) ->
+            smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
+        end,
+        new(),
+        List
+    ).
+
+equal(Q1, Q2) ->
+    out_all(Q1) =:= out_all(Q2).
+
+out_all(Q) ->
+    out_all(Q, []).
+out_all(Q0, Acc) ->
+    case smoosh_priority_queue:out(Q0) of
+        {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
+        false -> lists:reverse(Acc)
+    end.
diff --git a/src/smoosh/test/smoosh_tests.erl b/src/smoosh/test/smoosh_tests.erl
new file mode 100644
index 0000000..13855f6
--- /dev/null
+++ b/src/smoosh/test/smoosh_tests.erl
@@ -0,0 +1,130 @@
+-module(smoosh_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-include("couch/src/couch_db_int.hrl").
+
+-define(KILOBYTE, binary:copy(<<"x">>, 1024)).
+
+%% ==========
+%% Setup
+%% ----------
+
+setup(ChannelType) ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    couch_db:close(Db),
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:flush(ChannelPid),
+    ok = config:set(config_section(ChannelType), "min_size", "200000", false),
+    DbName.
+
+teardown(ChannelType, DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+    ok = config:delete(config_section(DbName), "min_size", false),
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:flush(ChannelPid),
+    ok.
+
+config_section(ChannelType) ->
+    "smoosh." ++ ChannelType.
+
+%% ==========
+%% Tests
+%% ----------
+
+smoosh_test_() ->
+    {
+        "Testing smoosh",
+        {
+            setup,
+            fun() -> test_util:start_couch([smoosh]) end,
+            fun test_util:stop/1,
+            [
+                channels_tests(),
+                persistence_tests()
+            ]
+        }
+    }.
+
+persistence_tests() ->
+    Tests = [
+        fun should_persist_queue/2
+    ],
+    {
+        "Should persist queue state",
+        [
+            make_test_case("ratio_dbs", Tests)
+        ]
+    }.
+
+channels_tests() ->
+    Tests = [
+        fun should_enqueue/2
+    ],
+    {
+        "Various channels tests",
+        [
+            make_test_case("ratio_dbs", Tests)
+        ]
+    }.
+
+make_test_case(Type, Funs) ->
+    {foreachx, fun setup/1, fun teardown/2, [{Type, Fun} || Fun <- Funs]}.
+
+should_enqueue(ChannelType, DbName) ->
+    ?_test(begin
+        ok = grow_db_file(DbName, 300),
+        ok = wait_enqueue(ChannelType, DbName),
+        ?assert(is_enqueued(ChannelType, DbName)),
+        ok
+    end).
+
+should_persist_queue(ChannelType, DbName) ->
+    ?_test(begin
+        {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+        ok = grow_db_file(DbName, 300),
+        ok = wait_enqueue(ChannelType, DbName),
+        ok = smoosh_channel:persist(ChannelPid),
+        Q0 = channel_queue(ChannelType),
+        ok = application:stop(smoosh),
+        ok = application:start(smoosh),
+        Q1 = channel_queue(ChannelType),
+        ?assertEqual(Q0, Q1),
+        ok
+    end).
+
+grow_db_file(DbName, SizeInKb) ->
+    {ok, #db{filepath = FilePath} = Db} = couch_db:open_int(DbName, 
[?ADMIN_CTX]),
+    {ok, Fd} = file:open(FilePath, [append]),
+    Bytes = binary:copy(?KILOBYTE, SizeInKb),
+    file:write(Fd, Bytes),
+    ok = file:close(Fd),
+    Doc = couch_doc:from_json_obj(
+        {[
+            {<<"_id">>, ?l2b(?docid())},
+            {<<"value">>, ?l2b(?docid())}
+        ]}
+    ),
+    {ok, _} = couch_db:update_docs(Db, [Doc], []),
+    couch_db:close(Db),
+    ok.
+
+is_enqueued(ChannelType, DbName) ->
+    {ok, ChannelPid} = smoosh_server:get_channel(ChannelType),
+    smoosh_channel:is_key(ChannelPid, DbName).
+
+wait_enqueue(ChannelType, DbName) ->
+    test_util:wait(fun() ->
+        case is_enqueued(ChannelType, DbName) of
+            false ->
+                wait;
+            true ->
+                ok
+        end
+    end).
+
+channel_queue(ChannelType) ->
+    Q0 = smoosh_priority_queue:new(ChannelType),
+    smoosh_priority_queue:recover(Q0).

Reply via email to