Repository: incubator-impala Updated Branches: refs/heads/master 922ee7031 -> ae7200874
IMPALA-1891: Statestore won't send deletions in initial non-delta topic Currently, when a subscriber connects to the statestore, its first update contains the entire topic including deletions which are useless because the subscriber has no base to apply those deletions to. This patch ensures that the deletions are not included in that first update. Testing: Modified existing test case to include this check Change-Id: I3fc525e1f3d960d642fc6356abb75f744cab7c33 Reviewed-on: http://gerrit.cloudera.org:8080/7527 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/229f12c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/229f12c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/229f12c1 Branch: refs/heads/master Commit: 229f12c1f64c17cd029291125a8ab540259438b7 Parents: 922ee70 Author: Bikramjeet Vig <[email protected]> Authored: Thu Jul 27 13:03:36 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Aug 1 00:00:01 2017 +0000 ---------------------------------------------------------------------- be/src/statestore/statestore.cc | 23 ++++++++++++++--------- tests/statestore/test_statestore.py | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/229f12c1/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index bd6361f..9f6097c 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -519,23 +519,19 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber, topic_delta.is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION; topic_delta.__set_from_version(last_processed_version); - if (!topic_delta.is_delta && - topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) { - int64_t topic_size = - topic.total_key_size_bytes() + topic.total_value_size_bytes(); - VLOG_QUERY << "Preparing initial " << topic_delta.topic_name - << " topic update for " << subscriber.id() << ". Size = " - << PrettyPrinter::Print(topic_size, TUnit::BYTES); - } - TopicUpdateLog::const_iterator next_update = topic.topic_update_log().upper_bound(last_processed_version); + int64_t deleted_key_size_bytes = 0; for (; next_update != topic.topic_update_log().end(); ++next_update) { TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second); DCHECK(itr != topic.entries().end()); const TopicEntry& topic_entry = itr->second; if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) { + if (!topic_delta.is_delta) { + deleted_key_size_bytes += itr->first.size(); + continue; + } topic_delta.topic_deletions.push_back(itr->first); } else { topic_delta.topic_entries.push_back(TTopicItem()); @@ -546,6 +542,15 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber, } } + if (!topic_delta.is_delta && + topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) { + int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes + + topic.total_value_size_bytes(); + VLOG_QUERY << "Preparing initial " << topic_delta.topic_name + << " topic update for " << subscriber.id() << ". Size = " + << PrettyPrinter::Print(topic_size, TUnit::BYTES); + } + if (topic.topic_update_log().size() > 0) { // The largest version for this topic will be the last item in the version history // map. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/229f12c1/tests/statestore/test_statestore.py ---------------------------------------------------------------------- diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py index 31ae8c4..e2b1715 100644 --- a/tests/statestore/test_statestore.py +++ b/tests/statestore/test_statestore.py @@ -461,7 +461,7 @@ class TestStatestore(): assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1 # Statestore should not send deletions when the update is not a delta, see # IMPALA-1891 - # assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0 + assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0 return DEFAULT_UPDATE_STATE_RESPONSE reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
