Repository: incubator-impala Updated Branches: refs/heads/master d861db44a -> 536a0612a
IMPALA-5500: Reduce catalog update topic size Problem: IMPALA-4029 introduced the use of the flatbuffers serialization libary for storing file and block metadata. That change reduced the effectiveness of the Thrift compaction protocol (when --compact_catalog_topic is used), thereby causing a 2X increase in catalog update topic size when the compact protocol is used. Fix: LZ4 compress the catalog topic updates before sent to the statestore when --compact_catalog_topic is set to true. Results: ~4X reduction in catalog update topic size Change-Id: I2f725cd8596205e6101d5b56abf08125faa30b0a Reviewed-on: http://gerrit.cloudera.org:8080/7268 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d2430ea0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d2430ea0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d2430ea0 Branch: refs/heads/master Commit: d2430ea0ab284a560366b1172e8e73e149505ce4 Parents: d861db4 Author: Dimitris Tsirogiannis <[email protected]> Authored: Mon Jun 19 10:16:07 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 29 01:20:47 2017 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog-server.cc | 10 +++- be/src/catalog/catalog-util.cc | 38 ++++++++++++++ be/src/catalog/catalog-util.h | 13 +++++ be/src/service/impala-server.cc | 26 ++++++++-- .../test_compact_catalog_updates.py | 54 ++++++++++++++++++++ 5 files changed, 136 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-server.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 08015b6..bf2892f 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -21,6 +21,7 @@ #include <thrift/protocol/TDebugProtocol.h> #include "catalog/catalog-util.h" +#include "exec/read-write-util.h" #include "statestore/statestore-subscriber.h" #include "util/debug-util.h" #include "util/logging-support.h" @@ -299,7 +300,6 @@ void CatalogServer::UpdateCatalogTopicCallback( void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) { unordered_set<string> current_entry_keys; - // Add any new/updated catalog objects to the topic. for (const TCatalogObject& catalog_object: catalog_objects) { const string& entry_key = TCatalogObjectToEntryKey(catalog_object); @@ -327,6 +327,14 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje if (!status.ok()) { LOG(ERROR) << "Error serializing topic value: " << status.GetDetail(); pending_topic_updates_.pop_back(); + continue; + } + if (FLAGS_compact_catalog_topic) { + status = CompressCatalogObject(&item.value); + if (!status.ok()) { + LOG(ERROR) << "Error compressing catalog object: " << status.GetDetail(); + pending_topic_updates_.pop_back(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-util.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc index 7648cac..57d21b3 100644 --- a/be/src/catalog/catalog-util.cc +++ b/be/src/catalog/catalog-util.cc @@ -21,6 +21,8 @@ #include "catalog/catalog-util.h" #include "common/status.h" +#include "exec/read-write-util.h" +#include "util/compress.h" #include "util/debug-util.h" #include "common/names.h" @@ -187,5 +189,41 @@ string TCatalogObjectToEntryKey(const TCatalogObject& catalog_object) { return entry_key.str(); } +Status CompressCatalogObject(string* catalog_object) { + scoped_ptr<Codec> compressor; + RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, THdfsCompression::LZ4, + &compressor)); + string output_buffer; + int64_t compressed_data_len = compressor->MaxOutputLen(catalog_object->size()); + int64_t output_buffer_len = compressed_data_len + sizeof(uint32_t); + output_buffer.resize(output_buffer_len); + uint8_t* output_buffer_ptr = + const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(output_buffer.data())); + ReadWriteUtil::PutInt(output_buffer_ptr, static_cast<uint32_t>(catalog_object->size())); + output_buffer_ptr += sizeof(uint32_t); + compressor->ProcessBlock(true, catalog_object->size(), + reinterpret_cast<const uint8_t*>(catalog_object->data()), &compressed_data_len, + &output_buffer_ptr); + output_buffer.resize(compressed_data_len + sizeof(uint32_t)); + *catalog_object = move(output_buffer); + return Status::OK(); +} + +Status DecompressCatalogObject(const string& compressed_catalog_object, + vector<uint8_t>* output_buffer) { + scoped_ptr<Codec> decompressor; + RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false, THdfsCompression::LZ4, + &decompressor)); + const uint8_t* input_data_ptr = + reinterpret_cast<const uint8_t*>(compressed_catalog_object.data()); + int64_t decompressed_len = ReadWriteUtil::GetInt<uint32_t>(input_data_ptr); + output_buffer->resize(decompressed_len); + input_data_ptr += sizeof(uint32_t); + uint8_t* decompressed_data_ptr = output_buffer->data(); + int64_t compressed_data_len = compressed_catalog_object.size() - sizeof(uint32_t); + RETURN_IF_ERROR(decompressor->ProcessBlock(true, compressed_data_len, + input_data_ptr, &decompressed_len, &decompressed_data_ptr)); + return Status::OK(); +} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-util.h ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h index 0ee024e..ddc2c21 100644 --- a/be/src/catalog/catalog-util.h +++ b/be/src/catalog/catalog-util.h @@ -19,6 +19,7 @@ #ifndef IMPALA_CATALOG_CATALOG_UTIL_H #define IMPALA_CATALOG_CATALOG_UTIL_H +#include "common/status.h" #include "gen-cpp/CatalogObjects_types.h" namespace impala { @@ -51,6 +52,18 @@ Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type, /// Returns an empty string if there were any problem building the key. std::string TCatalogObjectToEntryKey(const TCatalogObject& catalog_object); +/// Compresses a serialized catalog object using LZ4 and stores it back in +/// 'catalog_object'. Stores the size of the uncopressed catalog object in the +/// first sizeof(uint32_t) bytes of 'catalog_object'. +Status CompressCatalogObject(std::string* catalog_object) WARN_UNUSED_RESULT; + +/// Decompress an LZ4-compressed catalog object. The decompressed object +/// is stored in 'output_buffer'. The first sizeof(uint32_t) bytes of +/// 'compressed_catalog_object' store the size of the uncompressed catalog +/// object. +Status DecompressCatalogObject(const std::string& compressed_catalog_object, + std::vector<uint8_t>* output_buffer) WARN_UNUSED_RESULT; + } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index f4b26a2..37e06ac 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1262,6 +1262,7 @@ void ImpalaServer::CatalogUpdateCallback( if (topic == incoming_topic_deltas.end()) return; const TTopicDelta& delta = topic->second; + // Update catalog cache in frontend. An update is split into batches of size // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499 if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0) { @@ -1274,12 +1275,29 @@ void ImpalaServer::CatalogUpdateCallback( int64_t new_catalog_version = catalog_update_info_.catalog_version; uint64_t batch_size_bytes = 0; for (const TTopicItem& item: delta.topic_entries) { - uint32_t len = item.value.size(); TCatalogObject catalog_object; - Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>( - item.value.data()), &len, FLAGS_compact_catalog_topic, &catalog_object); + Status status; + vector<uint8_t> data_buffer; + const uint8_t* data_buffer_ptr = nullptr; + uint32_t len = 0; + if (FLAGS_compact_catalog_topic) { + status = DecompressCatalogObject(item.value, &data_buffer); + if (!status.ok()) { + LOG(ERROR) << "Error decompressing catalog object " << item.key + << ": " << status.GetDetail(); + continue; + } + data_buffer_ptr = data_buffer.data(); + len = data_buffer.size(); + } else { + data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data()); + len = item.value.size(); + } + status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic, + &catalog_object); if (!status.ok()) { - LOG(ERROR) << "Error deserializing item: " << status.GetDetail(); + LOG(ERROR) << "Error deserializing item " << item.key + << ": " << status.GetDetail(); continue; } if (len > 100 * 1024 * 1024 /* 100MB */) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/tests/custom_cluster/test_compact_catalog_updates.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_compact_catalog_updates.py b/tests/custom_cluster/test_compact_catalog_updates.py new file mode 100644 index 0000000..c824ad5 --- /dev/null +++ b/tests/custom_cluster/test_compact_catalog_updates.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Test Catalog behavior when --compact_catalog_topic is true. + +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + +class TestCompactCatalogUpdates(CustomClusterTestSuite): + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(impalad_args="--compact_catalog_topic=true", + catalogd_args="--compact_catalog_topic=true") + def test_compact_catalog_topic_updates(self): + """Start Impala cluster with compact catalog update topics enabled and run a set + of smoke tests to verify that catalog updates are received properly.""" + + try: + # Check that initial catalop update topic has been received + impalad1 = self.cluster.impalads[0] + assert impalad1.service.get_metric_value("catalog.num-tables") > 0 + impalad2 = self.cluster.impalads[1] + assert impalad2.service.get_metric_value("catalog.num-tables") > 0 + + client1 = impalad1.service.create_beeswax_client() + client2 = impalad2.service.create_beeswax_client() + query_options = {"sync_ddl" : 1} + self.execute_query_expect_success(client1, "refresh functional.alltypes", + query_options) + result = client2.execute("select count(*) from functional.alltypes") + assert result.data[0] == "7300" + + self.execute_query_expect_success(client1, "invalidate metadata", query_options) + self.execute_query_expect_success(client2, "show databases") + assert impalad1.service.get_metric_value("catalog.num-databases") > 0 + assert impalad2.service.get_metric_value("catalog.num-databases") > 0 + finally: + client1.close() + client2.close() +
