Repository: incubator-impala
Updated Branches:
  refs/heads/master 922ee7031 -> ae7200874


IMPALA-1891: Statestore won't send deletions in initial non-delta topic

Currently, when a subscriber connects to the statestore, its first
update contains the entire topic including deletions which are useless
because the subscriber has no base to apply those deletions to. This
patch ensures that the deletions are not included in that first update.

Testing:
Modified existing test case to include this check

Change-Id: I3fc525e1f3d960d642fc6356abb75f744cab7c33
Reviewed-on: http://gerrit.cloudera.org:8080/7527
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/229f12c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/229f12c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/229f12c1

Branch: refs/heads/master
Commit: 229f12c1f64c17cd029291125a8ab540259438b7
Parents: 922ee70
Author: Bikramjeet Vig <[email protected]>
Authored: Thu Jul 27 13:03:36 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Aug 1 00:00:01 2017 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore.cc     | 23 ++++++++++++++---------
 tests/statestore/test_statestore.py |  2 +-
 2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/229f12c1/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index bd6361f..9f6097c 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -519,23 +519,19 @@ void Statestore::GatherTopicUpdates(const Subscriber& 
subscriber,
       topic_delta.is_delta = last_processed_version > 
Subscriber::TOPIC_INITIAL_VERSION;
       topic_delta.__set_from_version(last_processed_version);
 
-      if (!topic_delta.is_delta &&
-          topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        int64_t topic_size =
-            topic.total_key_size_bytes() + topic.total_value_size_bytes();
-        VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
-                   << " topic update for " << subscriber.id() << ". Size = "
-                   << PrettyPrinter::Print(topic_size, TUnit::BYTES);
-      }
-
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
+      int64_t deleted_key_size_bytes = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = 
topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
         if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
+          if (!topic_delta.is_delta) {
+            deleted_key_size_bytes += itr->first.size();
+            continue;
+          }
           topic_delta.topic_deletions.push_back(itr->first);
         } else {
           topic_delta.topic_entries.push_back(TTopicItem());
@@ -546,6 +542,15 @@ void Statestore::GatherTopicUpdates(const Subscriber& 
subscriber,
         }
       }
 
+      if (!topic_delta.is_delta &&
+          topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
+        int64_t topic_size = topic.total_key_size_bytes() - 
deleted_key_size_bytes
+            + topic.total_value_size_bytes();
+        VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
+                   << " topic update for " << subscriber.id() << ". Size = "
+                   << PrettyPrinter::Print(topic_size, TUnit::BYTES);
+      }
+
       if (topic.topic_update_log().size() > 0) {
         // The largest version for this topic will be the last item in the 
version history
         // map.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/229f12c1/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py 
b/tests/statestore/test_statestore.py
index 31ae8c4..e2b1715 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -461,7 +461,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, 
see
         # IMPALA-1891
-        # assert len(args.topic_deltas[transient_topic_name].topic_deletions) 
== 0
+        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, 
is_transient=False),

Reply via email to