This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6b20440 KUDU-3201: [hms] Support gzipped HMS notifications
(GzipJSONMessageEncoder)
6b20440 is described below
commit 6b20440f4c51a6b69c1382db51139bf8d3467b05
Author: Grant Henke <[email protected]>
AuthorDate: Tue Oct 6 22:22:47 2020 -0500
KUDU-3201: [hms] Support gzipped HMS notifications (GzipJSONMessageEncoder)
This patch adds support for HMS notification messages encoded
via the GzipJSONMessageEncoder that was added in
HIVE-20679. It does this by decoding the base64 encoding
and decompressing the messages sent with the `gzip(json-2.0)`
format.
Because HIVE-20679 is only in Hive 4 which is unreleased,
I created a static encoded message to test the gzip functionality.
I also manually tested this functionality on a real cluster that has
HIVE-20679.
While testing this patch I found and fixed a bug in our zlib.cc
Uncompress method. Without the fix zlib would
return a BUF_ERROR when the decompressed data fits
in the output buffer on the first call to `inflate`.
For reference I used the following post to help understand
the expected usage of `inflate`:
https://zlib.net/zlib_how.html
Change-Id: I0f94e2446255e1fafb1dac9bf9ce23b81d6c0960
Reviewed-on: http://gerrit.cloudera.org:8080/16565
Tested-by: Grant Henke <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
---
.../master/hms_notification_log_listener-test.cc | 24 ++++++++
src/kudu/master/hms_notification_log_listener.cc | 67 ++++++++++++++--------
src/kudu/master/hms_notification_log_listener.h | 24 ++++++++
src/kudu/util/zlib.cc | 22 ++++---
4 files changed, 107 insertions(+), 30 deletions(-)
diff --git a/src/kudu/master/hms_notification_log_listener-test.cc
b/src/kudu/master/hms_notification_log_listener-test.cc
index 415c4c6..fc65f92 100644
--- a/src/kudu/master/hms_notification_log_listener-test.cc
+++ b/src/kudu/master/hms_notification_log_listener-test.cc
@@ -19,6 +19,7 @@
#include <cstdint>
#include <thread>
+#include <string>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
@@ -32,12 +33,18 @@
DECLARE_uint32(hive_metastore_notification_log_poll_period_seconds);
DECLARE_uint32(hive_metastore_notification_log_poll_inject_latency_ms);
+using std::string;
+
namespace kudu {
namespace master {
class HmsNotificationLogListenerTest : public KuduTest {
public:
uint32_t poll_period_ =
FLAGS_hive_metastore_notification_log_poll_period_seconds;
+
+ static Status DecodeGzipMessage(const string& encoded, string* decoded) {
+ return HmsNotificationLogListenerTask::DecodeGzipMessage(encoded, decoded);
+ }
};
// Test that an immediate shutdown will short-circuit the poll period.
@@ -100,5 +107,22 @@ TEST_F(HmsNotificationLogListenerTest,
TestWaitAndShutdown) {
listener.Shutdown();
waiter.join();
}
+
+TEST_F(HmsNotificationLogListenerTest, TestGzipEventDecoding) {
+ // A message encoded via the following Hive code:
+ // JSONDropTableMessage message =
+ // new JSONDropTableMessage("server", "principal", "db", "table", 0L);
+ // String serialized =
+ //
GzipJSONMessageEncoder.getInstance().getSerializer().serialize(message);
+ string encoded =
"H4sIAAAAAAAAAKtWKk4tKkstUrKCMXTAjMzk1ICizLzkzILEHKBcAZyto5SSBBQAE"
+
"jpKJYlJOalADoSG8kMqC4BieaU5OVAB/6Qsr+L8PLhYZm5qcUliboGSlUEtAOPIspV/AAAA";
+ // The JSON content of the encoded string and the expected result after
decoding.
+ string json =
"{\"server\":\"server\",\"servicePrincipal\":\"principal\",\"db\":\"db\","
+
"\"table\":\"table\",\"tableType\":null,\"tableObjJson\":null,\"timestamp\":0}";
+ string decoded;
+ ASSERT_OK(DecodeGzipMessage(encoded, &decoded));
+ ASSERT_EQ(json, decoded);
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/hms_notification_log_listener.cc
b/src/kudu/master/hms_notification_log_listener.cc
index b2845a4..82f906e 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -29,12 +29,12 @@
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
-#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
#include "kudu/hms/hive_metastore_types.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/hms/hms_client.h"
@@ -45,6 +45,8 @@
#include "kudu/util/slice.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/thread.h"
+#include "kudu/util/url-coding.h"
+#include "kudu/util/zlib.h"
DEFINE_uint32(hive_metastore_notification_log_poll_period_seconds, 15,
"Amount of time the notification log listener waits between
attempts to poll "
@@ -176,28 +178,6 @@ string EventDebugString(const hive::NotificationEvent&
event) {
return Substitute("$0 $1 $2.$3", event.eventId, event.eventType,
event.dbName, event.tableName);
}
-// Parses the event message from a notification event. See
-// org.apache.hadoop.hive.metastore.messaging.MessageFactory for more info.
-//
-// Since JSONMessageFactory is currently the only concrete implementation of
-// MessageFactory, this method is specialized to return the Document type. If
-// another MessageFactory instance becomes used in the future this method
should
-// be updated to handle it accordingly.
-// Also because 'messageFormat' is an optional field introduced in HIVE-10562.
-// We consider message without this field valid, to be compatible with HIVE
-// distributions that do not include HIVE-10562 but still have the proper
-// JSONMessageFactory.
-Status ParseMessage(const hive::NotificationEvent& event, Document* message) {
- if (!event.messageFormat.empty() && event.messageFormat != "json-0.2") {
- return Status::NotSupported("unknown message format", event.messageFormat);
- }
- if (message->Parse<0>(event.message.c_str()).HasParseError()) {
- return Status::Corruption("failed to parse message",
-
rapidjson::GetParseError_En(message->GetParseError()));
- }
- return Status::OK();
-}
-
// Deserializes an HMS table object from a JSON notification log message.
Status DeserializeTable(const hive::NotificationEvent(event),
const Document& message,
@@ -462,5 +442,46 @@ Status
HmsNotificationLogListenerTask::HandleDropTableEvent(const hive::Notifica
return Status::OK();
}
+Status HmsNotificationLogListenerTask::ParseMessage(const
hive::NotificationEvent& event,
+ Document* message) {
+ string format = event.messageFormat;
+ // Default to the json-0.2 format for backwards compatibility.
+ if (format.empty()) {
+ format = "json-0.2";
+ }
+
+ // See Hive's JSONMessageEncoder and GzipJSONMessageEncoder for the format
definitions.
+ if (event.messageFormat != "json-0.2" && event.messageFormat !=
"gzip(json-2.0)") {
+ return Status::NotSupported("unknown message format", event.messageFormat);
+ }
+
+ string content = event.message;
+ if (HasPrefixString(format, "gzip")) {
+ string decoded;
+ KUDU_RETURN_NOT_OK(DecodeGzipMessage(content, &decoded));
+ content = decoded;
+ }
+
+ if (message->Parse<0>(content.c_str()).HasParseError()) {
+ return Status::Corruption("failed to parse message",
+
rapidjson::GetParseError_En(message->GetParseError()));
+ }
+
+ return Status::OK();
+}
+
+Status HmsNotificationLogListenerTask::DecodeGzipMessage(const string& encoded,
+ string* decoded) {
+ string result;
+ bool success = Base64Decode(encoded, &result);
+ if (!success) {
+ return Status::Corruption("failed to decode message");
+ }
+ std::ostringstream oss;
+ RETURN_NOT_OK_PREPEND(zlib::Uncompress(Slice(result), &oss), "failed
decompress message");
+ *decoded = oss.str();
+ return Status::OK();
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/hms_notification_log_listener.h
b/src/kudu/master/hms_notification_log_listener.h
index c4e152d..d7bdd36 100644
--- a/src/kudu/master/hms_notification_log_listener.h
+++ b/src/kudu/master/hms_notification_log_listener.h
@@ -18,8 +18,11 @@
#pragma once
#include <cstdint>
+#include <string>
#include <vector>
+#include <rapidjson/document.h>
+
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
@@ -98,6 +101,7 @@ class HmsNotificationLogListenerTask {
Status WaitForCatchUp(const MonoTime& deadline) WARN_UNUSED_RESULT;
private:
+ friend class HmsNotificationLogListenerTest;
// Runs the main loop of the listening thread.
void RunLoop();
@@ -119,6 +123,26 @@ class HmsNotificationLogListenerTask {
Status HandleDropTableEvent(const hive::NotificationEvent& event,
int64_t* durable_event_id) WARN_UNUSED_RESULT;
+ // Parses the event message from a notification event. See
+ // org.apache.hadoop.hive.metastore.messaging.MessageFactory and
+ // org.apache.hadoop.hive.metastore.messaging.MessageEncoder for more info.
+ //
+ // Since JSON formats are currently the only concrete implementation,
+ // this method is specialized to return the Document type.
+ // If another MessageFactory instance becomes used in the future this
+ // method should be updated to handle it accordingly. Also because
+ // 'messageFormat' is an optional field introduced in HIVE-10562,
+ // we consider messages without this field to be `json-0.2` to be
+ // compatible with Hive distributions that do not include HIVE-10562
+ // but still have the proper JSON message.
+ static Status ParseMessage(const hive::NotificationEvent& event,
+ rapidjson::Document* message) WARN_UNUSED_RESULT;
+
+ // Decodes an event that was Gzip encoded by Hive.
+ // Hive also Base64 encodes the content after compressing.
+ static Status DecodeGzipMessage(const std::string& encoded,
+ std::string* decoded) WARN_UNUSED_RESULT;
+
// The associated catalog manager.
//
// May be initialized to nullptr in the constructor to facilitate unit
diff --git a/src/kudu/util/zlib.cc b/src/kudu/util/zlib.cc
index c025953..4bda79a 100644
--- a/src/kudu/util/zlib.cc
+++ b/src/kudu/util/zlib.cc
@@ -72,11 +72,12 @@ Status Compress(Slice input, ostream* out) {
return CompressLevel(input, Z_DEFAULT_COMPRESSION, out);
}
+// See https://zlib.net/zlib_how.html for context on using zlib.
Status CompressLevel(Slice input, int level, ostream* out) {
z_stream zs;
memset(&zs, 0, sizeof(zs));
ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
- 15 + 16 /* 15 window bits, enable gzip */,
+ MAX_WBITS + 16 /* enable gzip */,
8 /* memory level, max is 9 */,
Z_DEFAULT_STRATEGY));
@@ -102,27 +103,34 @@ Status CompressLevel(Slice input, int level, ostream*
out) {
return Status::OK();
}
+// See https://zlib.net/zlib_how.html for context on using zlib.
Status Uncompress(Slice compressed, std::ostream* out) {
+ // Initialize the z_stream at the start of the data with the
+ // data size as the available input.
z_stream zs;
memset(&zs, 0, sizeof(zs));
zs.next_in = const_cast<uint8_t*>(compressed.data());
zs.avail_in = compressed.size();
- ZRETURN_NOT_OK(inflateInit2(&zs, 15 + 16 /* 15 window bits, enable zlib */));
- int flush;
+ // Initialize inflation with the windowBits set to be GZIP compatible.
+ // The documentation (https://www.zlib.net/manual.html#Advanced) describes
that
+ // Adding 16 configures inflate to decode the gzip format.
+ ZRETURN_NOT_OK(inflateInit2(&zs, MAX_WBITS + 16 /* enable gzip */));
+ // Continue calling inflate, decompressing data into the buffer in
`zs.next_out` and writing
+ // the buffer content to `out`, until an error is received or there is no
more data
+ // to decompress.
Status s;
do {
unsigned char buf[4096];
zs.next_out = buf;
zs.avail_out = arraysize(buf);
- flush = zs.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH;
- s = ZlibResultToStatus(inflate(&zs, flush));
+ s = ZlibResultToStatus(inflate(&zs, Z_NO_FLUSH));
if (!s.ok() && !s.IsEndOfFile()) {
return s;
}
out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
- } while (flush == Z_NO_FLUSH);
+ } while (zs.avail_out == 0);
+ // If we haven't returned early with a bad status, finalize inflation.
ZRETURN_NOT_OK(inflateEnd(&zs));
-
return Status::OK();
}