Repository: incubator-impala
Updated Branches:
  refs/heads/master d861db44a -> 536a0612a


IMPALA-5500: Reduce catalog update topic size

Problem: IMPALA-4029 introduced the use of the flatbuffers serialization
libary for storing file and block metadata. That change reduced the
effectiveness of the Thrift compaction protocol (when
--compact_catalog_topic is used), thereby causing a 2X increase in
catalog update topic size when the compact protocol is used.

Fix: LZ4 compress the catalog topic updates before sent to the
statestore when --compact_catalog_topic is set to true.

Results: ~4X reduction in catalog update topic size

Change-Id: I2f725cd8596205e6101d5b56abf08125faa30b0a
Reviewed-on: http://gerrit.cloudera.org:8080/7268
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d2430ea0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d2430ea0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d2430ea0

Branch: refs/heads/master
Commit: d2430ea0ab284a560366b1172e8e73e149505ce4
Parents: d861db4
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Mon Jun 19 10:16:07 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 29 01:20:47 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 10 +++-
 be/src/catalog/catalog-util.cc                  | 38 ++++++++++++++
 be/src/catalog/catalog-util.h                   | 13 +++++
 be/src/service/impala-server.cc                 | 26 ++++++++--
 .../test_compact_catalog_updates.py             | 54 ++++++++++++++++++++
 5 files changed, 136 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 08015b6..bf2892f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -21,6 +21,7 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "catalog/catalog-util.h"
+#include "exec/read-write-util.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
@@ -299,7 +300,6 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
 void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& 
catalog_objects) {
   unordered_set<string> current_entry_keys;
-
   // Add any new/updated catalog objects to the topic.
   for (const TCatalogObject& catalog_object: catalog_objects) {
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
@@ -327,6 +327,14 @@ void CatalogServer::BuildTopicUpdates(const 
vector<TCatalogObject>& catalog_obje
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
       pending_topic_updates_.pop_back();
+      continue;
+    }
+    if (FLAGS_compact_catalog_topic) {
+      status = CompressCatalogObject(&item.value);
+      if (!status.ok()) {
+        LOG(ERROR) << "Error compressing catalog object: " << 
status.GetDetail();
+        pending_topic_updates_.pop_back();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 7648cac..57d21b3 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -21,6 +21,8 @@
 
 #include "catalog/catalog-util.h"
 #include "common/status.h"
+#include "exec/read-write-util.h"
+#include "util/compress.h"
 #include "util/debug-util.h"
 
 #include "common/names.h"
@@ -187,5 +189,41 @@ string TCatalogObjectToEntryKey(const TCatalogObject& 
catalog_object) {
   return entry_key.str();
 }
 
+Status CompressCatalogObject(string* catalog_object) {
+  scoped_ptr<Codec> compressor;
+  RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, 
THdfsCompression::LZ4,
+      &compressor));
+  string output_buffer;
+  int64_t compressed_data_len = 
compressor->MaxOutputLen(catalog_object->size());
+  int64_t output_buffer_len = compressed_data_len + sizeof(uint32_t);
+  output_buffer.resize(output_buffer_len);
+  uint8_t* output_buffer_ptr =
+      const_cast<uint8_t*>(reinterpret_cast<const 
uint8_t*>(output_buffer.data()));
+  ReadWriteUtil::PutInt(output_buffer_ptr, 
static_cast<uint32_t>(catalog_object->size()));
+  output_buffer_ptr += sizeof(uint32_t);
+  compressor->ProcessBlock(true, catalog_object->size(),
+      reinterpret_cast<const uint8_t*>(catalog_object->data()), 
&compressed_data_len,
+      &output_buffer_ptr);
+  output_buffer.resize(compressed_data_len + sizeof(uint32_t));
+  *catalog_object = move(output_buffer);
+  return Status::OK();
+}
+
+Status DecompressCatalogObject(const string& compressed_catalog_object,
+    vector<uint8_t>* output_buffer) {
+  scoped_ptr<Codec> decompressor;
+  RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false, 
THdfsCompression::LZ4,
+      &decompressor));
+  const uint8_t* input_data_ptr =
+      reinterpret_cast<const uint8_t*>(compressed_catalog_object.data());
+  int64_t decompressed_len = ReadWriteUtil::GetInt<uint32_t>(input_data_ptr);
+  output_buffer->resize(decompressed_len);
+  input_data_ptr += sizeof(uint32_t);
+  uint8_t* decompressed_data_ptr = output_buffer->data();
+  int64_t compressed_data_len = compressed_catalog_object.size() - 
sizeof(uint32_t);
+  RETURN_IF_ERROR(decompressor->ProcessBlock(true, compressed_data_len,
+      input_data_ptr, &decompressed_len, &decompressed_data_ptr));
+  return Status::OK();
+}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index 0ee024e..ddc2c21 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_CATALOG_CATALOG_UTIL_H
 #define IMPALA_CATALOG_CATALOG_UTIL_H
 
+#include "common/status.h"
 #include "gen-cpp/CatalogObjects_types.h"
 
 namespace impala {
@@ -51,6 +52,18 @@ Status TCatalogObjectFromObjectName(const 
TCatalogObjectType::type& object_type,
 /// Returns an empty string if there were any problem building the key.
 std::string TCatalogObjectToEntryKey(const TCatalogObject& catalog_object);
 
+/// Compresses a serialized catalog object using LZ4 and stores it back in
+/// 'catalog_object'. Stores the size of the uncopressed catalog object in the
+/// first sizeof(uint32_t) bytes of 'catalog_object'.
+Status CompressCatalogObject(std::string* catalog_object) WARN_UNUSED_RESULT;
+
+/// Decompress an LZ4-compressed catalog object. The decompressed object
+/// is stored in 'output_buffer'. The first sizeof(uint32_t) bytes of
+/// 'compressed_catalog_object' store the size of the uncompressed catalog
+/// object.
+Status DecompressCatalogObject(const std::string& compressed_catalog_object,
+    std::vector<uint8_t>* output_buffer) WARN_UNUSED_RESULT;
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index f4b26a2..37e06ac 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1262,6 +1262,7 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
+
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
   if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
@@ -1274,12 +1275,29 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
-      uint32_t len = item.value.size();
       TCatalogObject catalog_object;
-      Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-          item.value.data()), &len, FLAGS_compact_catalog_topic, 
&catalog_object);
+      Status status;
+      vector<uint8_t> data_buffer;
+      const uint8_t* data_buffer_ptr = nullptr;
+      uint32_t len = 0;
+      if (FLAGS_compact_catalog_topic) {
+        status = DecompressCatalogObject(item.value, &data_buffer);
+        if (!status.ok()) {
+          LOG(ERROR) << "Error decompressing catalog object " << item.key
+                     << ": " << status.GetDetail();
+          continue;
+        }
+        data_buffer_ptr = data_buffer.data();
+        len = data_buffer.size();
+      } else {
+        data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
+        len = item.value.size();
+      }
+      status = DeserializeThriftMsg(data_buffer_ptr, &len, 
FLAGS_compact_catalog_topic,
+          &catalog_object);
       if (!status.ok()) {
-        LOG(ERROR) << "Error deserializing item: " << status.GetDetail();
+        LOG(ERROR) << "Error deserializing item " << item.key
+                   << ": " << status.GetDetail();
         continue;
       }
       if (len > 100 * 1024 * 1024 /* 100MB */) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d2430ea0/tests/custom_cluster/test_compact_catalog_updates.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_compact_catalog_updates.py 
b/tests/custom_cluster/test_compact_catalog_updates.py
new file mode 100644
index 0000000..c824ad5
--- /dev/null
+++ b/tests/custom_cluster/test_compact_catalog_updates.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Test Catalog behavior when --compact_catalog_topic is true.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestCompactCatalogUpdates(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(impalad_args="--compact_catalog_topic=true",
+      catalogd_args="--compact_catalog_topic=true")
+  def test_compact_catalog_topic_updates(self):
+    """Start Impala cluster with compact catalog update topics enabled and run 
a set
+    of smoke tests to verify that catalog updates are received properly."""
+
+    try:
+      # Check that initial catalop update topic has been received
+      impalad1 = self.cluster.impalads[0]
+      assert impalad1.service.get_metric_value("catalog.num-tables") > 0
+      impalad2 = self.cluster.impalads[1]
+      assert impalad2.service.get_metric_value("catalog.num-tables") > 0
+
+      client1 = impalad1.service.create_beeswax_client()
+      client2 = impalad2.service.create_beeswax_client()
+      query_options = {"sync_ddl" : 1}
+      self.execute_query_expect_success(client1, "refresh functional.alltypes",
+          query_options)
+      result = client2.execute("select count(*) from functional.alltypes")
+      assert result.data[0] == "7300"
+
+      self.execute_query_expect_success(client1, "invalidate metadata", 
query_options)
+      self.execute_query_expect_success(client2, "show databases")
+      assert impalad1.service.get_metric_value("catalog.num-databases") > 0
+      assert impalad2.service.get_metric_value("catalog.num-databases") > 0
+    finally:
+      client1.close()
+      client2.close()
+

Reply via email to