This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 64ebea2 [Feature] Support gzip compression for http response (#4533)
64ebea2 is described below
commit 64ebea2e4333d9feee2a22395f117a01acd05f31
Author: Yingchun Lai <[email protected]>
AuthorDate: Sun Sep 6 20:30:12 2020 +0800
[Feature] Support gzip compression for http response (#4533)
After tablet level metrics is supported, the http metrics API may response
a very large body when a BE holds a large number of tablets, and cause heavy
network traffic.
This patch introduce http content compression to reduce network traffic.
---
be/src/http/http_channel.cpp | 38 ++++++++++-
be/src/http/http_channel.h | 2 +
be/src/http/http_request.cpp | 2 -
be/src/util/CMakeLists.txt | 1 +
be/src/util/zlib.cpp | 128 +++++++++++++++++++++++++++++++++++++
be/src/util/zlib.h | 43 +++++++++++++
be/test/util/CMakeLists.txt | 1 +
be/test/util/http_channel_test.cpp | 58 +++++++++++++++++
8 files changed, 270 insertions(+), 3 deletions(-)
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 82d7f6c..7b68370 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -17,17 +17,20 @@
#include "http/http_channel.h"
+#include <mutex>
#include <sstream>
#include <string>
#include <event2/buffer.h>
#include <event2/http.h>
+#include "gutil/strings/split.h"
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/http_headers.h"
#include "http/http_status.h"
#include "common/logging.h"
+#include "util/zlib.h"
namespace doris {
@@ -52,7 +55,13 @@ void HttpChannel::send_reply(HttpRequest* request,
HttpStatus status) {
void HttpChannel::send_reply(
HttpRequest* request, HttpStatus status, const std::string& content) {
auto evb = evbuffer_new();
- evbuffer_add(evb, content.c_str(), content.size());
+ std::string compressed_content;
+ if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING),
content, &compressed_content)) {
+ request->add_output_header(HttpHeaders::CONTENT_ENCODING, "gzip");
+ evbuffer_add(evb, compressed_content.c_str(),
compressed_content.size());
+ } else {
+ evbuffer_add(evb, content.c_str(), content.size());
+ }
evhttp_send_reply(request->get_evhttp_request(), status,
defalut_reason(status).c_str(), evb);
evbuffer_free(evb);
}
@@ -66,4 +75,31 @@ void HttpChannel::send_file(HttpRequest* request, int fd,
size_t off, size_t siz
evbuffer_free(evb);
}
+bool HttpChannel::compress_content(const std::string& accept_encoding, const
std::string& input, std::string* output) {
+ // Don't bother compressing empty content.
+ if (input.empty()) {
+ return false;
+ }
+
+ // Check if gzip compression is accepted by the caller. If so, compress the
+ // content and replace the prerendered output.
+ bool is_compressed = false;
+ std::vector<string> encodings = strings::Split(accept_encoding, ",");
+ for (string& encoding : encodings) {
+ StripWhiteSpace(&encoding);
+ if (encoding == "gzip") {
+ std::ostringstream oss;
+ Status s = zlib::CompressLevel(Slice(input), 1, &oss);
+ if (s.ok()) {
+ *output = oss.str();
+ is_compressed = true;
+ } else {
+ LOG(WARNING) << "Could not compress output: " << s.to_string();
+ }
+ break;
+ }
+ }
+ return is_compressed;
+}
+
}
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index 307dd81..9c7546b 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -46,6 +46,8 @@ public:
static void send_reply(HttpRequest* request, HttpStatus status, const
std::string& content);
static void send_file(HttpRequest* request, int fd, size_t off, size_t
size);
+
+ static bool compress_content(const std::string& accept_encoding, const
std::string& input, std::string* output);
};
}
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index cdf1bb4..b6205a6 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -122,9 +122,7 @@ const std::string& HttpRequest::param(const std::string&
key) const {
}
void HttpRequest::add_output_header(const char* key, const char* value) {
-// #ifndef BE_TEST
evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
-// #endif
}
std::string HttpRequest::get_request_body() {
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index d54a4d2..4c44ddb 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -100,6 +100,7 @@ set(UTIL_FILES
easy_json.cc
mustache/mustache.cc
brpc_stub_cache.cpp
+ zlib.cpp
)
if (WITH_MYSQL)
diff --git a/be/src/util/zlib.cpp b/be/src/util/zlib.cpp
new file mode 100644
index 0000000..fe0c325
--- /dev/null
+++ b/be/src/util/zlib.cpp
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/zlib.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include <zconf.h>
+#include <zlib.h>
+
+#include "gutil/macros.h"
+#include "gutil/strings/substitute.h"
+
+using std::ostream;
+using std::string;
+using std::unique_ptr;
+
+#define ZRETURN_NOT_OK(call) \
+ RETURN_IF_ERROR(ZlibResultToStatus(call))
+
+namespace doris {
+namespace zlib {
+
+namespace {
+Status ZlibResultToStatus(int rc) {
+ switch (rc) {
+ case Z_OK:
+ return Status::OK();
+ case Z_STREAM_END:
+ return Status::EndOfFile("zlib EOF");
+ case Z_NEED_DICT:
+ return Status::Corruption("zlib error: NEED_DICT");
+ case Z_ERRNO:
+ return Status::IOError("zlib error: Z_ERRNO");
+ case Z_STREAM_ERROR:
+ return Status::Corruption("zlib error: STREAM_ERROR");
+ case Z_DATA_ERROR:
+ return Status::Corruption("zlib error: DATA_ERROR");
+ case Z_MEM_ERROR:
+ return Status::RuntimeError("zlib error: MEM_ERROR");
+ case Z_BUF_ERROR:
+ return Status::RuntimeError("zlib error: BUF_ERROR");
+ case Z_VERSION_ERROR:
+ return Status::RuntimeError("zlib error: VERSION_ERROR");
+ default:
+ return Status::RuntimeError(
+ strings::Substitute("zlib error: unknown error $0", rc));
+ }
+}
+} // anonymous namespace
+
+Status Compress(Slice input, ostream* out) {
+ return CompressLevel(input, Z_DEFAULT_COMPRESSION, out);
+}
+
+Status CompressLevel(Slice input, int level, ostream* out) {
+ z_stream zs;
+ memset(&zs, 0, sizeof(zs));
+ ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
+ 15 + 16 /* 15 window bits, enable gzip */,
+ 8 /* memory level, max is 9 */,
+ Z_DEFAULT_STRATEGY));
+
+ zs.avail_in = input.get_size();
+ zs.next_in = (unsigned char*)(input.mutable_data());
+ const int kChunkSize = 256 * 1024;
+ unique_ptr<unsigned char[]> chunk(new unsigned char[kChunkSize]);
+ int flush;
+ do {
+ zs.avail_out = kChunkSize;
+ zs.next_out = chunk.get();
+ flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
+ Status s = ZlibResultToStatus(deflate(&zs, flush));
+ if (!s.ok() && !s.is_end_of_file()) {
+ return s;
+ }
+ int out_size = zs.next_out - chunk.get();
+ if (out_size > 0) {
+ out->write(reinterpret_cast<char *>(chunk.get()), out_size);
+ }
+ } while (flush != Z_FINISH);
+ ZRETURN_NOT_OK(deflateEnd(&zs));
+ return Status::OK();
+}
+
+Status Uncompress(Slice compressed, std::ostream* out) {
+ z_stream zs;
+ memset(&zs, 0, sizeof(zs));
+ zs.next_in = (unsigned char*)(compressed.mutable_data());
+ zs.avail_in = compressed.get_size();
+ ZRETURN_NOT_OK(inflateInit2(&zs, 15 + 16 /* 15 window bits, enable zlib */));
+ int flush;
+ Status s;
+ do {
+ unsigned char buf[4096];
+ zs.next_out = buf;
+ zs.avail_out = arraysize(buf);
+ flush = zs.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH;
+ s = ZlibResultToStatus(inflate(&zs, flush));
+ if (!s.ok() && !s.is_end_of_file()) {
+ return s;
+ }
+ out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
+ } while (flush == Z_NO_FLUSH);
+ ZRETURN_NOT_OK(inflateEnd(&zs));
+
+ return Status::OK();
+}
+
+} // namespace zlib
+} // namespace doris
diff --git a/be/src/util/zlib.h b/be/src/util/zlib.h
new file mode 100644
index 0000000..b062c40
--- /dev/null
+++ b/be/src/util/zlib.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <iosfwd>
+
+#include "util/slice.h"
+#include "common/status.h"
+
+namespace doris {
+namespace zlib {
+
+// Zlib-compress the data in 'input', appending the result to 'out'.
+//
+// In case of an error, some data may still be appended to 'out'.
+Status Compress(Slice input, std::ostream* out);
+
+// The same as the above, but with a custom level (1-9, where 1 is fastest
+// and 9 is best compression).
+Status CompressLevel(Slice input, int level, std::ostream* out);
+
+// Uncompress the zlib-compressed data in 'compressed', appending the result
+// to 'out'.
+//
+// In case of an error, some data may still be appended to 'out'.
+Status Uncompress(Slice compressed, std::ostream* out);
+
+} // namespace zlib
+} // namespace doris
diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt
index 9d419bb..df3c9f8 100644
--- a/be/test/util/CMakeLists.txt
+++ b/be/test/util/CMakeLists.txt
@@ -65,3 +65,4 @@ ADD_BE_TEST(thread_test)
ADD_BE_TEST(threadpool_test)
ADD_BE_TEST(trace_test)
ADD_BE_TEST(easy_json-test)
+ADD_BE_TEST(http_channel_test)
diff --git a/be/test/util/http_channel_test.cpp
b/be/test/util/http_channel_test.cpp
new file mode 100644
index 0000000..c7b0421
--- /dev/null
+++ b/be/test/util/http_channel_test.cpp
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/http_channel.h"
+
+#include <gtest/gtest.h>
+
+#include "util/zlib.h"
+#include "util/logging.h"
+
+namespace doris {
+
+class HttpChannelTest : public testing::Test {
+public:
+ void check_data_eq(const std::string& output, const std::string& expected)
{
+ std::ostringstream oss;
+ ASSERT_TRUE(zlib::Uncompress(Slice(output), &oss).ok());
+ ASSERT_EQ(expected, oss.str());
+ }
+};
+
+TEST_F(HttpChannelTest, CompressContent) {
+ ASSERT_FALSE(HttpChannel::compress_content("gzip", "", nullptr));
+ ASSERT_FALSE(HttpChannel::compress_content("", "test", nullptr));
+ ASSERT_FALSE(HttpChannel::compress_content("Gzip", "", nullptr));
+
+ const std::string& intput("test_data_0123456789abcdefg");
+ std::string output;
+
+ ASSERT_TRUE(HttpChannel::compress_content("gzip", intput, &output));
+ ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput));
+
+ ASSERT_TRUE(HttpChannel::compress_content("123,gzip,321", intput,
&output));
+ ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput));
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ doris::init_glog("be-test");
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]