This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b20440  KUDU-3201: [hms] Support gzipped HMS notifications 
(GzipJSONMessageEncoder)
6b20440 is described below

commit 6b20440f4c51a6b69c1382db51139bf8d3467b05
Author: Grant Henke <[email protected]>
AuthorDate: Tue Oct 6 22:22:47 2020 -0500

    KUDU-3201: [hms] Support gzipped HMS notifications (GzipJSONMessageEncoder)
    
    This patch adds support for HMS notification messages encoded
    via the GzipJSONMessageEncoder that was added in
    HIVE-20679. It does this by decoding the base64 encoding
    and decompressing the messages sent with the `gzip(json-2.0)`
    format.
    
    Because HIVE-20679 is only in Hive 4 which is unreleased,
    I created a static encoded message to test the gzip functionality.
    I also  manually tested this functionality on a real cluster that has
    HIVE-20679.
    
    While testing this patch I found and fixed a bug in our zlib.cc
    Uncompress method. Without the fix zlib would
    return a BUF_ERROR when the decompressed data fits
    in the output buffer on the first call to `inflate`.
    
    For reference I used the following post to help understand
    the expected usage of `inflate`:
    https://zlib.net/zlib_how.html
    
    Change-Id: I0f94e2446255e1fafb1dac9bf9ce23b81d6c0960
    Reviewed-on: http://gerrit.cloudera.org:8080/16565
    Tested-by: Grant Henke <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 .../master/hms_notification_log_listener-test.cc   | 24 ++++++++
 src/kudu/master/hms_notification_log_listener.cc   | 67 ++++++++++++++--------
 src/kudu/master/hms_notification_log_listener.h    | 24 ++++++++
 src/kudu/util/zlib.cc                              | 22 ++++---
 4 files changed, 107 insertions(+), 30 deletions(-)

diff --git a/src/kudu/master/hms_notification_log_listener-test.cc 
b/src/kudu/master/hms_notification_log_listener-test.cc
index 415c4c6..fc65f92 100644
--- a/src/kudu/master/hms_notification_log_listener-test.cc
+++ b/src/kudu/master/hms_notification_log_listener-test.cc
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <thread>
+#include <string>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -32,12 +33,18 @@
 DECLARE_uint32(hive_metastore_notification_log_poll_period_seconds);
 DECLARE_uint32(hive_metastore_notification_log_poll_inject_latency_ms);
 
+using std::string;
+
 namespace kudu {
 namespace master {
 
 class HmsNotificationLogListenerTest : public KuduTest {
  public:
   uint32_t poll_period_ = 
FLAGS_hive_metastore_notification_log_poll_period_seconds;
+
+  static Status DecodeGzipMessage(const string& encoded, string* decoded) {
+    return HmsNotificationLogListenerTask::DecodeGzipMessage(encoded, decoded);
+  }
 };
 
 // Test that an immediate shutdown will short-circuit the poll period.
@@ -100,5 +107,22 @@ TEST_F(HmsNotificationLogListenerTest, 
TestWaitAndShutdown) {
   listener.Shutdown();
   waiter.join();
 }
+
+TEST_F(HmsNotificationLogListenerTest, TestGzipEventDecoding) {
+  // A message encoded via the following Hive code:
+  //    JSONDropTableMessage message =
+  //       new JSONDropTableMessage("server", "principal", "db", "table", 0L);
+  //    String serialized =
+  //       
GzipJSONMessageEncoder.getInstance().getSerializer().serialize(message);
+  string encoded = 
"H4sIAAAAAAAAAKtWKk4tKkstUrKCMXTAjMzk1ICizLzkzILEHKBcAZyto5SSBBQAE"
+                   
"jpKJYlJOalADoSG8kMqC4BieaU5OVAB/6Qsr+L8PLhYZm5qcUliboGSlUEtAOPIspV/AAAA";
+  // The JSON content of the encoded string and the expected result after 
decoding.
+  string json = 
"{\"server\":\"server\",\"servicePrincipal\":\"principal\",\"db\":\"db\","
+                
"\"table\":\"table\",\"tableType\":null,\"tableObjJson\":null,\"timestamp\":0}";
+  string decoded;
+  ASSERT_OK(DecodeGzipMessage(encoded, &decoded));
+  ASSERT_EQ(json, decoded);
+}
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/hms_notification_log_listener.cc 
b/src/kudu/master/hms_notification_log_listener.cc
index b2845a4..82f906e 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -29,12 +29,12 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <rapidjson/document.h>
 #include <rapidjson/error/en.h>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_catalog.h"
 #include "kudu/hms/hms_client.h"
@@ -45,6 +45,8 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status_callback.h"
 #include "kudu/util/thread.h"
+#include "kudu/util/url-coding.h"
+#include "kudu/util/zlib.h"
 
 DEFINE_uint32(hive_metastore_notification_log_poll_period_seconds, 15,
               "Amount of time the notification log listener waits between 
attempts to poll "
@@ -176,28 +178,6 @@ string EventDebugString(const hive::NotificationEvent& 
event) {
   return Substitute("$0 $1 $2.$3", event.eventId, event.eventType, 
event.dbName, event.tableName);
 }
 
-// Parses the event message from a notification event. See
-// org.apache.hadoop.hive.metastore.messaging.MessageFactory for more info.
-//
-// Since JSONMessageFactory is currently the only concrete implementation of
-// MessageFactory, this method is specialized to return the Document type. If
-// another MessageFactory instance becomes used in the future this method 
should
-// be updated to handle it accordingly.
-// Also because 'messageFormat' is an optional field introduced in HIVE-10562.
-// We consider message without this field valid, to be compatible with HIVE
-// distributions that do not include HIVE-10562 but still have the proper
-// JSONMessageFactory.
-Status ParseMessage(const hive::NotificationEvent& event, Document* message) {
-  if (!event.messageFormat.empty() && event.messageFormat != "json-0.2") {
-    return Status::NotSupported("unknown message format", event.messageFormat);
-  }
-  if (message->Parse<0>(event.message.c_str()).HasParseError()) {
-    return Status::Corruption("failed to parse message",
-                              
rapidjson::GetParseError_En(message->GetParseError()));
-  }
-  return Status::OK();
-}
-
 // Deserializes an HMS table object from a JSON notification log message.
 Status DeserializeTable(const hive::NotificationEvent(event),
                         const Document& message,
@@ -462,5 +442,46 @@ Status 
HmsNotificationLogListenerTask::HandleDropTableEvent(const hive::Notifica
   return Status::OK();
 }
 
+Status HmsNotificationLogListenerTask::ParseMessage(const 
hive::NotificationEvent& event,
+                                                    Document* message) {
+  string format = event.messageFormat;
+  // Default to the json-0.2 format for backwards compatibility.
+  if (format.empty()) {
+    format = "json-0.2";
+  }
+
+  // See Hive's JSONMessageEncoder and GzipJSONMessageEncoder for the format 
definitions.
+  if (event.messageFormat != "json-0.2" && event.messageFormat != 
"gzip(json-2.0)") {
+    return Status::NotSupported("unknown message format", event.messageFormat);
+  }
+
+  string content = event.message;
+  if (HasPrefixString(format, "gzip")) {
+    string decoded;
+    KUDU_RETURN_NOT_OK(DecodeGzipMessage(content, &decoded));
+    content = decoded;
+  }
+
+  if (message->Parse<0>(content.c_str()).HasParseError()) {
+    return Status::Corruption("failed to parse message",
+                              
rapidjson::GetParseError_En(message->GetParseError()));
+  }
+
+  return Status::OK();
+}
+
+Status HmsNotificationLogListenerTask::DecodeGzipMessage(const string& encoded,
+                                                         string* decoded) {
+  string result;
+  bool success = Base64Decode(encoded, &result);
+  if (!success) {
+    return Status::Corruption("failed to decode message");
+  }
+  std::ostringstream oss;
+  RETURN_NOT_OK_PREPEND(zlib::Uncompress(Slice(result), &oss), "failed 
decompress message");
+  *decoded = oss.str();
+  return Status::OK();
+}
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/hms_notification_log_listener.h 
b/src/kudu/master/hms_notification_log_listener.h
index c4e152d..d7bdd36 100644
--- a/src/kudu/master/hms_notification_log_listener.h
+++ b/src/kudu/master/hms_notification_log_listener.h
@@ -18,8 +18,11 @@
 #pragma once
 
 #include <cstdint>
+#include <string>
 #include <vector>
 
+#include <rapidjson/document.h>
+
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/condition_variable.h"
@@ -98,6 +101,7 @@ class HmsNotificationLogListenerTask {
   Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT;
 
  private:
+  friend class HmsNotificationLogListenerTest;
 
   // Runs the main loop of the listening thread.
   void RunLoop();
@@ -119,6 +123,26 @@ class HmsNotificationLogListenerTask {
   Status HandleDropTableEvent(const hive::NotificationEvent& event,
                               int64_t* durable_event_id) WARN_UNUSED_RESULT;
 
+  // Parses the event message from a notification event. See
+  // org.apache.hadoop.hive.metastore.messaging.MessageFactory and
+  // org.apache.hadoop.hive.metastore.messaging.MessageEncoder for more info.
+  //
+  // Since JSON formats are currently the only concrete implementation,
+  // this method is specialized to return the Document type.
+  // If another MessageFactory instance becomes used in the future this
+  // method should be updated to handle it accordingly. Also because
+  // 'messageFormat' is an optional field introduced in HIVE-10562,
+  // we consider messages without this field to be `json-0.2` to be
+  // compatible with Hive distributions that do not include HIVE-10562
+  // but still have the proper JSON message.
+  static Status ParseMessage(const hive::NotificationEvent& event,
+                             rapidjson::Document* message) WARN_UNUSED_RESULT;
+
+  // Decodes an event that was Gzip encoded by Hive.
+  // Hive also Base64 encodes the content after compressing.
+  static Status DecodeGzipMessage(const std::string& encoded,
+                                  std::string* decoded) WARN_UNUSED_RESULT;
+
   // The associated catalog manager.
   //
   // May be initialized to nullptr in the constructor to facilitate unit
diff --git a/src/kudu/util/zlib.cc b/src/kudu/util/zlib.cc
index c025953..4bda79a 100644
--- a/src/kudu/util/zlib.cc
+++ b/src/kudu/util/zlib.cc
@@ -72,11 +72,12 @@ Status Compress(Slice input, ostream* out) {
   return CompressLevel(input, Z_DEFAULT_COMPRESSION, out);
 }
 
+// See https://zlib.net/zlib_how.html for context on using zlib.
 Status CompressLevel(Slice input, int level, ostream* out) {
   z_stream zs;
   memset(&zs, 0, sizeof(zs));
   ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
-                              15 + 16 /* 15 window bits, enable gzip */,
+                              MAX_WBITS + 16 /* enable gzip */,
                               8 /* memory level, max is 9 */,
                               Z_DEFAULT_STRATEGY));
 
@@ -102,27 +103,34 @@ Status CompressLevel(Slice input, int level, ostream* 
out) {
   return Status::OK();
 }
 
+// See https://zlib.net/zlib_how.html for context on using zlib.
 Status Uncompress(Slice compressed, std::ostream* out) {
+  // Initialize the z_stream at the start of the data with the
+  // data size as the available input.
   z_stream zs;
   memset(&zs, 0, sizeof(zs));
   zs.next_in = const_cast<uint8_t*>(compressed.data());
   zs.avail_in = compressed.size();
-  ZRETURN_NOT_OK(inflateInit2(&zs, 15 + 16 /* 15 window bits, enable zlib */));
-  int flush;
+  // Initialize inflation with the windowBits set to be GZIP compatible.
+  // The documentation (https://www.zlib.net/manual.html#Advanced) describes 
that
+  // Adding 16 configures inflate to decode the gzip format.
+  ZRETURN_NOT_OK(inflateInit2(&zs, MAX_WBITS + 16 /* enable gzip */));
+  // Continue calling inflate, decompressing data into the buffer in 
`zs.next_out` and writing
+  // the buffer content to `out`, until an error is received or there is no 
more data
+  // to decompress.
   Status s;
   do {
     unsigned char buf[4096];
     zs.next_out = buf;
     zs.avail_out = arraysize(buf);
-    flush = zs.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH;
-    s = ZlibResultToStatus(inflate(&zs, flush));
+    s = ZlibResultToStatus(inflate(&zs, Z_NO_FLUSH));
     if (!s.ok() && !s.IsEndOfFile()) {
       return s;
     }
     out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
-  } while (flush == Z_NO_FLUSH);
+  } while (zs.avail_out == 0);
+  // If we haven't returned early with a bad status, finalize inflation.
   ZRETURN_NOT_OK(inflateEnd(&zs));
-
   return Status::OK();
 }
 

Reply via email to