KUDU-686 (part 1/2): decompose guts of DMSIterator into DeltaPreparer

To address KUDU-686, we're going to repurpose DMSIterator's PrepareBatch()
machinery and associated in-memory state for use in the DeltaFileIterator.
Doing so obviates the need for a "multi-pass" approach to ApplyUpdates()
because DeltaFileIterator will no longer be performing any decoding there,
having done all of it in PrepareBatch().

This patch lays the groundwork by refactoring the guts of DMSIterator into
the new DeltaPreparer class. DMSIterator will continue to concern itself
with CBTree iteration, but will delegate the delta preparation and service
to DeltaPreparer.

In performing this refactor, I tried to be as faithful as possible to the
original code. The one exception is that I replaced prepared_idx_ and
prepared_count_ with state that I found easier to understand.

No new tests; I figured there was enough test coverage of DMSIterator, and
testing DeltaPreparer directly seemed like it'd be low bang for the buck.

Change-Id: I26c92dfa69b5323e7fbb18d02010ed99cc29c3e5
Reviewed-on: http://gerrit.cloudera.org:8080/11394
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <a...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bd8d747e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bd8d747e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bd8d747e

Branch: refs/heads/master
Commit: bd8d747ed57b3cfa53c0c1b4465589b7ec5f1663
Parents: 04468d0
Author: Adar Dembo <a...@cloudera.com>
Authored: Tue Sep 4 22:38:57 2018 -0700
Committer: Adar Dembo <a...@cloudera.com>
Committed: Wed Oct 31 23:58:08 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_iterator_merger.cc |  10 +-
 src/kudu/tablet/delta_iterator_merger.h  |  34 +++--
 src/kudu/tablet/delta_store.cc           | 165 ++++++++++++++++++++
 src/kudu/tablet/delta_store.h            | 209 ++++++++++++++++++++------
 src/kudu/tablet/deltafile.cc             |  33 ++--
 src/kudu/tablet/deltafile.h              |  28 ++--
 src/kudu/tablet/deltamemstore.cc         | 151 +++----------------
 src/kudu/tablet/deltamemstore.h          |  64 ++------
 8 files changed, 420 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_iterator_merger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.cc 
b/src/kudu/tablet/delta_iterator_merger.cc
index 23e6fff..4230c1c 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -43,7 +43,7 @@ DeltaIteratorMerger::DeltaIteratorMerger(
     vector<unique_ptr<DeltaIterator> > iters)
     : iters_(std::move(iters)) {}
 
-Status DeltaIteratorMerger::Init(ScanSpec *spec) {
+Status DeltaIteratorMerger::Init(ScanSpec* spec) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->Init(spec));
   }
@@ -64,21 +64,21 @@ Status DeltaIteratorMerger::PrepareBatch(size_t nrows, 
PrepareFlag flag) {
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock 
*dst) {
+Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock* 
dst) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->ApplyUpdates(col_to_apply, dst));
   }
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::ApplyDeletes(SelectionVector *sel_vec) {
+Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->ApplyDeletes(sel_vec));
   }
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::CollectMutations(vector<Mutation *> *dst, Arena 
*arena) {
+Status DeltaIteratorMerger::CollectMutations(vector<Mutation*>* dst, Arena* 
arena) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->CollectMutations(dst, arena));
   }
@@ -117,7 +117,7 @@ bool DeltaIteratorMerger::HasNext() {
   return false;
 }
 
-bool DeltaIteratorMerger::MayHaveDeltas() {
+bool DeltaIteratorMerger::MayHaveDeltas() const {
   for (const unique_ptr<DeltaIterator>& iter : iters_) {
     if (iter->MayHaveDeltas()) {
       return true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_iterator_merger.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.h 
b/src/kudu/tablet/delta_iterator_merger.h
index d1bd9b4..cbfb685 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -23,7 +23,6 @@
 #include <vector>
 
 #include "kudu/common/rowid.h"
-#include "kudu/gutil/port.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/status.h"
 
@@ -57,18 +56,27 @@ class DeltaIteratorMerger : public DeltaIterator {
   ////////////////////////////////////////////////////////////
   // Implementations of DeltaIterator
   ////////////////////////////////////////////////////////////
-  virtual Status Init(ScanSpec *spec) OVERRIDE;
-  virtual Status SeekToOrdinal(rowid_t idx) OVERRIDE;
-  virtual Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
-  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
-  virtual Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
-  virtual Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) 
OVERRIDE;
-  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& 
col_ids,
-                                                 
std::vector<DeltaKeyAndUpdate>* out,
-                                                 Arena* arena) OVERRIDE;
-  virtual bool HasNext() OVERRIDE;
-  bool MayHaveDeltas() override;
-  virtual std::string ToString() const OVERRIDE;
+  Status Init(ScanSpec* spec) override;
+
+  Status SeekToOrdinal(rowid_t idx) override;
+
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
+
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
+
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
+
+  Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
+                                         std::vector<DeltaKeyAndUpdate>* out,
+                                         Arena* arena) override;
+
+  bool HasNext() override;
+
+  bool MayHaveDeltas() const override;
+
+  std::string ToString() const override;
 
  private:
   explicit DeltaIteratorMerger(std::vector<std::unique_ptr<DeltaIterator> > 
iters);

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 379a2fa..e327a7c 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -19,18 +19,28 @@
 
 #include <algorithm>
 #include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <ostream>
 
 #include <glog/logging.h>
 
+#include "kudu/common/columnblock.h"
+#include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/types.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/deltafile.h"
+#include "kudu/tablet/mutation.h"
+#include "kudu/tablet/mvcc.h"
+#include "kudu/util/debug-util.h"
 #include "kudu/util/memory/arena.h"
 
 namespace kudu {
@@ -51,6 +61,161 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const 
Schema& schema, bool p
                                                  
key.timestamp().ToString()))));
 }
 
+DeltaPreparer::DeltaPreparer(RowIteratorOptions opts)
+    : opts_(std::move(opts)),
+      cur_prepared_idx_(0),
+      prev_prepared_idx_(0),
+      prepared_for_(NOT_PREPARED) {
+}
+
+void DeltaPreparer::Seek(rowid_t row_idx) {
+  prev_prepared_idx_ = row_idx;
+  cur_prepared_idx_ = row_idx;
+  prepared_for_ = NOT_PREPARED;
+}
+
+void DeltaPreparer::Start(DeltaIterator::PrepareFlag flag) {
+  if (updates_by_col_.empty()) {
+    updates_by_col_.resize(opts_.projection->num_columns());
+  }
+  for (UpdatesForColumn& ufc : updates_by_col_) {
+    ufc.clear();
+  }
+  deleted_.clear();
+  prepared_deltas_.clear();
+  switch (flag) {
+    case DeltaIterator::PREPARE_FOR_APPLY:
+      prepared_for_ = PREPARED_FOR_APPLY;
+      break;
+    case DeltaIterator::PREPARE_FOR_COLLECT:
+      prepared_for_ = PREPARED_FOR_COLLECT;
+      break;
+    default:
+      LOG(FATAL) << "Unknown prepare flag: " << flag;
+  }
+}
+
+void DeltaPreparer::Finish(size_t nrows) {
+  prev_prepared_idx_ = cur_prepared_idx_;
+  cur_prepared_idx_ += nrows;
+}
+
+Status DeltaPreparer::AddDelta(const DeltaKey& key, Slice val) {
+  if (!opts_.snap_to_include.IsCommitted(key.timestamp())) {
+    return Status::OK();
+  }
+
+  if (prepared_for_ == PREPARED_FOR_APPLY) {
+    RowChangeListDecoder decoder((RowChangeList(val)));
+    decoder.InitNoSafetyChecks();
+    DCHECK(!decoder.is_reinsert()) << "Reinserts are not supported in the 
DeltaMemStore.";
+    if (decoder.is_delete()) {
+      deleted_.emplace_back(key.row_idx());
+    } else {
+      DCHECK(decoder.is_update());
+      while (decoder.HasNext()) {
+        RowChangeListDecoder::DecodedUpdate dec;
+        RETURN_NOT_OK(decoder.DecodeNext(&dec));
+        int col_idx;
+        const void* col_val;
+        RETURN_NOT_OK(dec.Validate(*opts_.projection, &col_idx, &col_val));
+        if (col_idx == -1) {
+          // This column isn't being projected.
+          continue;
+        }
+        int col_size = opts_.projection->column(col_idx).type_info()->size();
+
+        // If we already have an earlier update for the same column, we can
+        // just overwrite that one.
+        if (updates_by_col_[col_idx].empty() ||
+            updates_by_col_[col_idx].back().row_id != key.row_idx()) {
+          updates_by_col_[col_idx].emplace_back();
+        }
+
+        ColumnUpdate& cu = updates_by_col_[col_idx].back();
+        cu.row_id = key.row_idx();
+        if (col_val == nullptr) {
+          cu.new_val_ptr = nullptr;
+        } else {
+          memcpy(cu.new_val_buf, col_val, col_size);
+          // NOTE: we're constructing a pointer here to an element inside the 
deque.
+          // This is safe because deques never invalidate pointers to their 
elements.
+          cu.new_val_ptr = cu.new_val_buf;
+        }
+      }
+    }
+  } else {
+    DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
+    PreparedDelta d;
+    d.key = key;
+    d.val = val;
+    prepared_deltas_.emplace_back(d);
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
+
+  const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
+  for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
+    int32_t idx_in_block = cu.row_id - prev_prepared_idx_;
+    DCHECK_GE(idx_in_block, 0);
+    SimpleConstCell src(col_schema, cu.new_val_ptr);
+    ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
+    RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::ApplyDeletes(SelectionVector* sel_vec) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  DCHECK_EQ(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
+
+  for (const auto& row_id : deleted_) {
+    uint32_t idx_in_block = row_id - prev_prepared_idx_;
+    sel_vec->SetRowUnselected(idx_in_block);
+  }
+
+  return Status::OK();
+}
+
+Status DeltaPreparer::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
+  for (const PreparedDelta& src : prepared_deltas_) {
+    DeltaKey key = src.key;
+    RowChangeList changelist(src.val);
+    uint32_t rel_idx = key.row_idx() - prev_prepared_idx_;
+
+    Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), 
changelist);
+    mutation->PrependToList(&dst->at(rel_idx));
+  }
+  return Status::OK();
+}
+
+Status DeltaPreparer::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& 
/*col_ids*/,
+                                                      
vector<DeltaKeyAndUpdate>* /*out*/,
+                                                      Arena* /*arena*/) {
+  LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << 
GetStackTrace();
+  return Status::InvalidArgument("FilterColumsAndAppend() is not supported by 
DMSIterator");
+}
+
+bool DeltaPreparer::MayHaveDeltas() const {
+  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
+  if (!deleted_.empty()) {
+    return true;
+  }
+  for (auto& col : updates_by_col_) {
+    if (!col.empty()) {
+      return true;
+    }
+  }
+  return false;
+}
+
 Status DebugDumpDeltaIterator(DeltaType type,
                               DeltaIterator* iter,
                               const Schema& schema,

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 6358df2..3f1c160 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -19,12 +19,15 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "kudu/common/rowid.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -47,8 +50,6 @@ class DeltaFileWriter;
 class DeltaIterator;
 class DeltaStats;
 class Mutation;
-class MvccSnapshot;
-struct RowIteratorOptions;
 
 // Interface for the pieces of the system that track deltas/updates.
 // This is implemented by DeltaMemStore and by DeltaFileReader.
@@ -126,7 +127,60 @@ struct DeltaKeyAndUpdate {
   std::string Stringify(DeltaType type, const Schema& schema, bool pad_key = 
false) const;
 };
 
-class DeltaIterator {
+// Representation of deltas that have been "prepared" by an iterator. That is,
+// they have been consistently read (at a snapshot) from their backing store
+// into an in-memory format suitable for efficient retrieval.
+class PreparedDeltas {
+ public:
+  // Applies the snapshotted updates to one of the columns.
+  //
+  // 'dst' must be the same length as was previously passed to PrepareBatch()
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) = 0;
+
+  // Applies any deletes to the given selection vector.
+  //
+  // Rows which have been deleted in the associated MVCC snapshot are set to 0
+  // in the selection vector so that they don't show up in the output.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
+
+  // Collects the mutations associated with each row in the current prepared 
batch.
+  //
+  // Each entry in the vector will be treated as a singly linked list of 
Mutation
+  // objects. If there are no mutations for that row, the entry will be 
unmodified.
+  // If there are mutations, they will be prepended at the head of the linked 
list
+  // (i.e the resulting list will be in descending timestamp order)
+  //
+  // The Mutation objects will be allocated out of the provided Arena, which 
must be non-NULL.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
+  virtual Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) = 
0;
+
+  // Iterates through all deltas, adding deltas for columns not specified in
+  // 'col_ids' to 'out'.
+  //
+  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
+  // deltas are considered relevant.
+  //
+  // The delta objects will be allocated out the provided Arena, which must be 
non-NULL.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_COLLECT.
+  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& 
col_ids,
+                                                 
std::vector<DeltaKeyAndUpdate>* out,
+                                                 Arena* arena) = 0;
+
+  // Returns true if there might exist deltas to be applied. It is safe to
+  // conservatively return true, but this would force a skip over decoder-level
+  // evaluation.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
+  virtual bool MayHaveDeltas() const = 0;
+};
+
+class DeltaIterator : public PreparedDeltas {
  public:
   // Initialize the iterator. This must be called once before any other
   // call.
@@ -143,8 +197,8 @@ class DeltaIterator {
   };
 
   // Prepare to apply deltas to a block of rows. This takes a consistent 
snapshot
-  // of all updates to the next 'nrows' rows, so that subsequent calls to
-  // ApplyUpdates() will not cause any "tearing"/non-atomicity.
+  // of all updates to the next 'nrows' rows, so that subsequent calls to a
+  // PreparedDeltas method will not cause any "tearing"/non-atomicity.
   //
   // 'flag' denotes whether the batch will be used for collecting mutations or
   // for applying them. Some implementations may choose to prepare differently.
@@ -153,62 +207,125 @@ class DeltaIterator {
   // of the previously prepared block.
   virtual Status PrepareBatch(size_t nrows, PrepareFlag flag) = 0;
 
-  // Apply the snapshotted updates to one of the columns.
-  // 'dst' must be the same length as was previously passed to PrepareBatch()
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) = 0;
+  // Returns true if there are any more rows left in this iterator.
+  virtual bool HasNext() = 0;
+
+  // Return a string representation suitable for debug printouts.
+  virtual std::string ToString() const = 0;
+
+  virtual ~DeltaIterator() {}
+};
 
-  // Apply any deletes to the given selection vector.
-  // Rows which have been deleted in the associated MVCC snapshot are set to
-  // 0 in the selection vector so that they don't show up in the output.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual Status ApplyDeletes(SelectionVector *sel_vec) = 0;
+// Encapsulates all logic and responsibility related to "delta preparation";
+// that is, the transformation of encoded deltas into an in-memory
+// representation more suitable for efficient service during iteration.
+//
+// This class is intended to be composed inside a DeltaIterator. The iterator
+// is responsible for loading encoded deltas from a backing store, passing them
+// to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
+// to serve the deltas.
+class DeltaPreparer : public PreparedDeltas {
+ public:
+  explicit DeltaPreparer(RowIteratorOptions opts);
 
-  // Collect the mutations associated with each row in the current prepared 
batch.
+  // Updates internal state to reflect a seek performed by a DeltaIterator.
   //
-  // Each entry in the vector will be treated as a singly linked list of 
Mutation
-  // objects. If there are no mutations for that row, the entry will be 
unmodified.
-  // If there are mutations, they will be prepended at the head of the linked 
list
-  // (i.e the resulting list will be in descending timestamp order)
+  // Call upon completion of DeltaIterator::SeekToOrdinal.
+  void Seek(rowid_t row_idx);
+
+  // Updates internal state to reflect the beginning of delta batch preparation
+  // on the part of a DeltaIterator.
   //
-  // The Mutation objects will be allocated out of the provided Arena, which 
must be non-NULL.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.
-  virtual Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) 
= 0;
+  // Call at the beginning of DeltaIterator::PrepareBatch.
+  void Start(DeltaIterator::PrepareFlag flag);
 
-  // Iterate through all deltas, adding deltas for columns not
-  // specified in 'col_ids' to 'out'.
+  // Updates internal state to reflect the end of delta batch preparation on 
the
+  // part of a DeltaIterator.
   //
-  // Unlike CollectMutations, the iterator's MVCC snapshots are ignored; all
-  // deltas are considered relevant.
-  // The delta objects will be allocated out of the provided Arena, which must 
be non-Null.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_COLLECT.
-  virtual Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& 
col_ids,
-                                                 
std::vector<DeltaKeyAndUpdate>* out,
-                                                 Arena* arena) = 0;
+  // Call at the end of DeltaIterator::PrepareBatch.
+  void Finish(size_t nrows);
 
-  // Returns true if there are any more rows left in this iterator.
-  virtual bool HasNext() = 0;
+  // Prepares the delta given by 'key' whose encoded changes are pointed to by 
'val'.
+  //
+  // Upon completion, it is safe for the memory behind 'val' to be destroyed.
+  //
+  // Call when a new delta becomes available in DeltaIterator::PrepareBatch.
+  Status AddDelta(const DeltaKey& key, Slice val);
 
-  // Returns true if there might exist deltas to be applied. It is safe to
-  // conservatively return true, but this would force a skip over decoder-level
-  // evaluation.
-  // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY.
-  virtual bool MayHaveDeltas() = 0;
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
 
-  // Return a string representation suitable for debug printouts.
-  virtual std::string ToString() const = 0;
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  virtual ~DeltaIterator() {}
-};
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
+
+  Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
+                                         std::vector<DeltaKeyAndUpdate>* out,
+                                         Arena* arena) override;
+
+  bool MayHaveDeltas() const override;
+
+  rowid_t cur_prepared_idx() const { return cur_prepared_idx_; }
+
+ private:
+  // Options with which the DeltaPreparer was constructed.
+  const RowIteratorOptions opts_;
 
-enum {
-  ITERATE_OVER_ALL_ROWS = 0
+  // The row index at which the most recent batch preparation ended.
+  rowid_t cur_prepared_idx_;
+
+  // The value of 'cur_prepared_idx_' from the previous batch.
+  rowid_t prev_prepared_idx_;
+
+  // Whether there are any prepared blocks.
+  enum PreparedFor {
+    // There are no prepared blocks. Attempts to call a PreparedDeltas function
+    // will fail.
+    NOT_PREPARED,
+
+    // The DeltaPreparer has prepared a batch of deltas for applying. All 
deltas
+    // in the batch have been decoded. UPDATEs and REINSERTs have been 
coalesced
+    // into a column-major data structure suitable for ApplyUpdates. DELETES
+    // have been coalesced into a row-major data structure suitable for 
ApplyDeletes.
+    //
+    // ApplyUpdates and ApplyDeltas are now callable.
+    PREPARED_FOR_APPLY,
+
+    // The DeltaPreparer has prepared a batch of deltas for collecting. Deltas
+    // remain encoded and in the order that they were loaded from the backing 
store.
+    //
+    // CollectMutations and FilterColumnIdsAndCollectDeltas are now callable.
+    PREPARED_FOR_COLLECT
+  };
+  PreparedFor prepared_for_;
+
+  // State when prepared_for_ == PREPARED_FOR_APPLY
+  // ------------------------------------------------------------
+  struct ColumnUpdate {
+    rowid_t row_id;
+    void* new_val_ptr;
+    uint8_t new_val_buf[16];
+  };
+  typedef std::deque<ColumnUpdate> UpdatesForColumn;
+  std::vector<UpdatesForColumn> updates_by_col_;
+  std::deque<rowid_t> deleted_;
+
+  // State when prepared_for_ == PREPARED_FOR_COLLECT
+  // ------------------------------------------------------------
+  struct PreparedDelta {
+    DeltaKey key;
+    Slice val;
+  };
+  std::deque<PreparedDelta> prepared_deltas_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
 };
 
+enum { ITERATE_OVER_ALL_ROWS = 0 };
+
 // Dumps contents of 'iter' to 'out', line-by-line.  Used to unit test
 // minor delta compaction.
 //
-// If nrows is 0, all rows will be dumped.
+// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
 Status DebugDumpDeltaIterator(DeltaType type,
                               DeltaIterator* iter,
                               const Schema& schema,
@@ -218,7 +335,7 @@ Status DebugDumpDeltaIterator(DeltaType type,
 // Writes the contents of 'iter' to 'out', block by block.  Used by
 // minor delta compaction.
 //
-// If nrows is 0, all rows will be dumped.
+// If 'nrows' is ITERATE_OVER_ALL_ROWS, all rows will be dumped.
 template<DeltaType Type>
 Status WriteDeltaIteratorToFile(DeltaIterator* iter,
                                 size_t nrows,

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index bbaad00..9a1be53 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -411,7 +411,7 @@ 
DeltaFileIterator::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
       delta_type_(delta_type),
       cache_blocks_(CFileReader::CACHE_BLOCK) {}
 
-Status DeltaFileIterator::Init(ScanSpec *spec) {
+Status DeltaFileIterator::Init(ScanSpec* spec) {
   DCHECK(!initted_) << "Already initted";
 
   if (spec) {
@@ -734,7 +734,7 @@ inline Status ApplyingVisitor<UNDO>::Visit(const DeltaKey& 
key,
   return Status::OK();
 }
 
-Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
+Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
   DCHECK_LE(prepared_count_, dst->nrows());
 
   if (delta_type_ == REDO) {
@@ -796,8 +796,8 @@ inline Status LivenessVisitor<REDO>::Visit(const DeltaKey& 
key,
 
 template<>
 inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas, bool*
-                                           continue_visit) {
+                                           const Slice& deltas,
+                                           bool* continue_visit) {
   if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), 
continue_visit)) {
     return ApplyDelete(key, deltas);
   }
@@ -805,7 +805,7 @@ inline Status LivenessVisitor<UNDO>::Visit(const DeltaKey& 
key,
 }
 
 
-Status DeltaFileIterator::ApplyDeletes(SelectionVector *sel_vec) {
+Status DeltaFileIterator::ApplyDeletes(SelectionVector* sel_vec) {
   DCHECK_LE(prepared_count_, sel_vec->nrows());
   if (delta_type_ == REDO) {
     DVLOG(3) << "Applying REDO deletes";
@@ -842,8 +842,8 @@ struct CollectingVisitor {
 
 template<>
 inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas,
-                                           bool* continue_visit) {
+                                             const Slice& deltas,
+                                             bool* continue_visit) {
   if (IsRedoRelevant(dfi->opts_.snap_to_include, key.timestamp(), 
continue_visit)) {
     return Collect(key, deltas);
   }
@@ -852,32 +852,31 @@ inline Status CollectingVisitor<REDO>::Visit(const 
DeltaKey& key,
 
 template<>
 inline Status CollectingVisitor<UNDO>::Visit(const DeltaKey& key,
-                                           const Slice& deltas, bool*
-                                           continue_visit) {
+                                             const Slice& deltas,
+                                             bool* continue_visit) {
   if (IsUndoRelevant(dfi->opts_.snap_to_include, key.timestamp(), 
continue_visit)) {
     return Collect(key, deltas);
   }
   return Status::OK();
 }
 
-Status DeltaFileIterator::CollectMutations(vector<Mutation *> *dst, Arena 
*dst_arena) {
+Status DeltaFileIterator::CollectMutations(vector<Mutation*>* dst, Arena* 
arena) {
   DCHECK_LE(prepared_count_, dst->size());
   if (delta_type_ == REDO) {
-    CollectingVisitor<REDO> visitor = {this, dst, dst_arena};
-    return VisitMutations(&visitor);
-  } else {
-    CollectingVisitor<UNDO> visitor = {this, dst, dst_arena};
+    CollectingVisitor<REDO> visitor = {this, dst, arena};
     return VisitMutations(&visitor);
   }
+  CollectingVisitor<UNDO> visitor = {this, dst, arena};
+  return VisitMutations(&visitor);
 }
 
 bool DeltaFileIterator::HasNext() {
   return !exhausted_ || !delta_blocks_.empty();
 }
 
-bool DeltaFileIterator::MayHaveDeltas() {
-  // TODO: change the API to take in the col_to_apply and check for deltas on
-  // that column only.
+bool DeltaFileIterator::MayHaveDeltas() const {
+  // TODO(awong): change the API to take in the col_to_apply and check for
+  // deltas on that column only.
   DCHECK(prepared_) << "must Prepare";
   for (auto& block : delta_blocks_) {
     BinaryPlainBlockDecoder& bpd = *block->decoder_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 811db5b..439aa48 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -230,19 +230,27 @@ class DeltaFileReader : public DeltaStore,
 // See DeltaIterator for details.
 class DeltaFileIterator : public DeltaIterator {
  public:
-  Status Init(ScanSpec *spec) OVERRIDE;
+  Status Init(ScanSpec* spec) override;
+
+  Status SeekToOrdinal(rowid_t idx) override;
+
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
+
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
+
+  Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
 
-  Status SeekToOrdinal(rowid_t idx) OVERRIDE;
-  Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
-  Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
-  Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
-  Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                          std::vector<DeltaKeyAndUpdate>* out,
-                                         Arena* arena) OVERRIDE;
-  std::string ToString() const OVERRIDE;
-  virtual bool HasNext() OVERRIDE;
-  bool MayHaveDeltas() override;
+                                         Arena* arena) override;
+
+  std::string ToString() const override;
+
+  bool HasNext() override;
+
+  bool MayHaveDeltas() const override;
 
  private:
   friend class DeltaFileReader;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index a704600..f68727b 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -17,29 +17,23 @@
 
 #include "kudu/tablet/deltamemstore.h"
 
-#include <cstring>
 #include <ostream>
 #include <utility>
 
 #include <glog/logging.h>
 
-#include "kudu/common/columnblock.h"
-#include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
-#include "kudu/common/rowblock.h"
-#include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
-#include "kudu/common/types.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/deltafile.h"
-#include "kudu/tablet/mutation.h"
-#include "kudu/tablet/mvcc.h"
-#include "kudu/util/debug-util.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memcmpable_varint.h"
 #include "kudu/util/memory/memory.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -201,12 +195,8 @@ void DeltaMemStore::DebugPrint() const {
 DMSIterator::DMSIterator(const shared_ptr<const DeltaMemStore>& dms,
                          RowIteratorOptions opts)
     : dms_(dms),
-      opts_(std::move(opts)),
+      preparer_(std::move(opts)),
       iter_(dms->tree_.NewIterator()),
-      initted_(false),
-      prepared_idx_(0),
-      prepared_count_(0),
-      prepared_for_(NOT_PREPARED),
       seeked_(false) {}
 
 Status DMSIterator::Init(ScanSpec* /*spec*/) {
@@ -221,9 +211,7 @@ Status DMSIterator::SeekToOrdinal(rowid_t row_idx) {
 
   bool exact; /* unused */
   iter_->SeekAtOrAfter(Slice(buf), &exact);
-  prepared_idx_ = row_idx;
-  prepared_count_ = 0;
-  prepared_for_ = NOT_PREPARED;
+  preparer_.Seek(row_idx);
   seeked_ = true;
   return Status::OK();
 }
@@ -242,17 +230,9 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag 
flag) {
   // copy here is instead a single copy of the data, so is likely faster.
   CHECK(seeked_);
   DCHECK(initted_) << "must init";
-  rowid_t start_row = prepared_idx_ + prepared_count_;
+  rowid_t start_row = preparer_.cur_prepared_idx();
   rowid_t stop_row = start_row + nrows - 1;
-
-  if (updates_by_col_.empty()) {
-    updates_by_col_.resize(opts_.projection->num_columns());
-  }
-  for (UpdatesForColumn& ufc : updates_by_col_) {
-    ufc.clear();
-  }
-  deleted_.clear();
-  prepared_deltas_.clear();
+  preparer_.Start(flag);
 
   while (iter_->IsValid()) {
     Slice key_slice, val;
@@ -261,133 +241,38 @@ Status DMSIterator::PrepareBatch(size_t nrows, 
PrepareFlag flag) {
     RETURN_NOT_OK(key.DecodeFrom(&key_slice));
     DCHECK_GE(key.row_idx(), start_row);
     if (key.row_idx() > stop_row) break;
-
-    if (!opts_.snap_to_include.IsCommitted(key.timestamp())) {
-      // The transaction which applied this update is not yet committed
-      // in this iterator's MVCC snapshot. Hence, skip it.
-      iter_->Next();
-      continue;
-    }
-
-    if (flag == PREPARE_FOR_APPLY) {
-      RowChangeListDecoder decoder((RowChangeList(val)));
-      decoder.InitNoSafetyChecks();
-      DCHECK(!decoder.is_reinsert()) << "Reinserts are not supported in the 
DeltaMemStore.";
-      if (decoder.is_delete()) {
-        deleted_.push_back(key.row_idx());
-      } else {
-        DCHECK(decoder.is_update());
-        while (decoder.HasNext()) {
-          RowChangeListDecoder::DecodedUpdate dec;
-          RETURN_NOT_OK(decoder.DecodeNext(&dec));
-          int col_idx;
-          const void* col_val;
-          RETURN_NOT_OK(dec.Validate(*opts_.projection, &col_idx, &col_val));
-          if (col_idx == -1) {
-            // This column isn't being projected.
-            continue;
-          }
-          int col_size = opts_.projection->column(col_idx).type_info()->size();
-
-          // If we already have an earlier update for the same column, we can
-          // just overwrite that one.
-          if (updates_by_col_[col_idx].empty() ||
-              updates_by_col_[col_idx].back().row_id != key.row_idx()) {
-            updates_by_col_[col_idx].emplace_back();
-          }
-
-          ColumnUpdate& cu = updates_by_col_[col_idx].back();
-          cu.row_id = key.row_idx();
-          if (col_val == nullptr) {
-            cu.new_val_ptr = nullptr;
-          } else {
-            memcpy(cu.new_val_buf, col_val, col_size);
-            // NOTE: we're constructing a pointer here to an element inside 
the deque.
-            // This is safe because deques never invalidate pointers to their 
elements.
-            cu.new_val_ptr = cu.new_val_buf;
-          }
-        }
-      }
-    } else {
-      DCHECK_EQ(flag, PREPARE_FOR_COLLECT);
-      PreparedDelta d;
-      d.key = key;
-      d.val = val;
-      prepared_deltas_.push_back(d);
-    }
-
+    RETURN_NOT_OK(preparer_.AddDelta(key, val));
     iter_->Next();
   }
-  prepared_idx_ = start_row;
-  prepared_count_ = nrows;
-  prepared_for_ = flag == PREPARE_FOR_APPLY ? PREPARED_FOR_APPLY : 
PREPARED_FOR_COLLECT;
+  preparer_.Finish(nrows);
   return Status::OK();
 }
 
-Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(prepared_count_, dst->nrows());
-
-  const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
-  for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
-    int32_t idx_in_block = cu.row_id - prepared_idx_;
-    DCHECK_GE(idx_in_block, 0);
-    SimpleConstCell src(col_schema, cu.new_val_ptr);
-    ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
-    RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
-  }
-
-  return Status::OK();
+Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
+  return preparer_.ApplyUpdates(col_to_apply, dst);
 }
 
-
-Status DMSIterator::ApplyDeletes(SelectionVector *sel_vec) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
-  DCHECK_EQ(prepared_count_, sel_vec->nrows());
-
-  for (auto& row_id : deleted_) {
-    uint32_t idx_in_block = row_id - prepared_idx_;
-    sel_vec->SetRowUnselected(idx_in_block);
-  }
-
-  return Status::OK();
+Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
+  return preparer_.ApplyDeletes(sel_vec);
 }
 
 
-Status DMSIterator::CollectMutations(vector<Mutation *> *dst, Arena *arena) {
-  DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
-  for (const PreparedDelta& src : prepared_deltas_) {
-    DeltaKey key = src.key;;
-    RowChangeList changelist(src.val);
-    uint32_t rel_idx = key.row_idx() - prepared_idx_;
-
-    Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), 
changelist);
-    mutation->PrependToList(&dst->at(rel_idx));
-  }
-  return Status::OK();
+Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
+  return preparer_.CollectMutations(dst, arena);
 }
 
 Status DMSIterator::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& 
col_ids,
                                                     vector<DeltaKeyAndUpdate>* 
out,
                                                     Arena* arena) {
-  LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << 
GetStackTrace();
-  return Status::InvalidArgument("FilterColumsAndAppend() is not supported by 
DMSIterator");
+  return preparer_.FilterColumnIdsAndCollectDeltas(col_ids, out, arena);
 }
 
 bool DMSIterator::HasNext() {
   return iter_->IsValid();
 }
 
-bool DMSIterator::MayHaveDeltas() {
-  if (!deleted_.empty()) {
-    return true;
-  }
-  for (auto& col: updates_by_col_) {
-    if (!col.empty()) {
-      return true;
-    }
-  }
-  return false;
+bool DMSIterator::MayHaveDeltas() const {
+  return preparer_.MayHaveDeltas();
 }
 
 string DMSIterator::ToString() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/bd8d747e/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 253a2c9..66e8c62 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -19,7 +19,6 @@
 
 #include <cstddef>
 #include <cstdint>
-#include <deque>
 #include <memory>
 #include <string>
 #include <vector>
@@ -33,13 +32,10 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/tablet/concurrent_btree.h"
-#include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
-#include "kudu/tablet/rowset.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/memory/arena.h"
-#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -55,16 +51,17 @@ struct ColumnId;
 
 namespace consensus {
 class OpId;
-}
+} // namespace consensus
 
 namespace fs {
 struct IOContext;
-}
+} // namespace fs
 
 namespace tablet {
 
 class DeltaFileWriter;
 class Mutation;
+struct RowIteratorOptions;
 
 struct DMSTreeTraits : public btree::BTreeTraits {
   typedef ThreadSafeMemoryTrackingArena ArenaType;
@@ -200,27 +197,27 @@ class DeltaMemStore : public DeltaStore,
 // functions.
 class DMSIterator : public DeltaIterator {
  public:
-  Status Init(ScanSpec *spec) OVERRIDE;
+  Status Init(ScanSpec* spec) override;
 
-  Status SeekToOrdinal(rowid_t row_idx) OVERRIDE;
+  Status SeekToOrdinal(rowid_t row_idx) override;
 
-  Status PrepareBatch(size_t nrows, PrepareFlag flag) OVERRIDE;
+  Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
 
-  Status ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) OVERRIDE;
+  Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
 
-  Status ApplyDeletes(SelectionVector *sel_vec) OVERRIDE;
+  Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status CollectMutations(std::vector<Mutation *> *dst, Arena *arena) OVERRIDE;
+  Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                          std::vector<DeltaKeyAndUpdate>* out,
-                                         Arena* arena) OVERRIDE;
+                                         Arena* arena) override;
 
-  std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
-  virtual bool HasNext() OVERRIDE;
+  bool HasNext() override;
 
-  bool MayHaveDeltas() override;
+  bool MayHaveDeltas() const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(DMSIterator);
@@ -237,47 +234,14 @@ class DMSIterator : public DeltaIterator {
 
   const std::shared_ptr<const DeltaMemStore> dms_;
 
-  const RowIteratorOptions opts_;
+  DeltaPreparer preparer_;
 
   gscoped_ptr<DeltaMemStore::DMSTreeIter> iter_;
 
   bool initted_;
 
-  // The index at which the last PrepareBatch() call was made
-  rowid_t prepared_idx_;
-
-  // The number of rows for which the last PrepareBatch() call was made
-  uint32_t prepared_count_;
-
-  // Whether there are prepared blocks built through PrepareBatch().
-  enum PreparedFor {
-    NOT_PREPARED,
-    PREPARED_FOR_APPLY,
-    PREPARED_FOR_COLLECT
-  };
-  PreparedFor prepared_for_;
-
   // True if SeekToOrdinal() been called at least once.
   bool seeked_;
-
-  // State when prepared_for_ == PREPARED_FOR_APPLY
-  // ------------------------------------------------------------
-  struct ColumnUpdate {
-    rowid_t row_id;
-    void* new_val_ptr;
-    uint8_t new_val_buf[16];
-  };
-  typedef std::deque<ColumnUpdate> UpdatesForColumn;
-  std::vector<UpdatesForColumn> updates_by_col_;
-  std::deque<rowid_t> deleted_;
-
-  // State when prepared_for_ == PREPARED_FOR_COLLECT
-  // ------------------------------------------------------------
-  struct PreparedDelta {
-    DeltaKey key;
-    Slice val;
-  };
-  std::deque<PreparedDelta> prepared_deltas_;
 };
 
 } // namespace tablet

Reply via email to