This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 64ebea2  [Feature] Support gzip compression for http response (#4533)
64ebea2 is described below

commit 64ebea2e4333d9feee2a22395f117a01acd05f31
Author: Yingchun Lai <[email protected]>
AuthorDate: Sun Sep 6 20:30:12 2020 +0800

    [Feature] Support gzip compression for http response (#4533)
    
    After tablet level metrics is supported, the http metrics API may response
    a very large body when a BE holds a large number of tablets, and cause heavy
    network traffic.
    This patch introduce http content compression to reduce network traffic.
---
 be/src/http/http_channel.cpp       |  38 ++++++++++-
 be/src/http/http_channel.h         |   2 +
 be/src/http/http_request.cpp       |   2 -
 be/src/util/CMakeLists.txt         |   1 +
 be/src/util/zlib.cpp               | 128 +++++++++++++++++++++++++++++++++++++
 be/src/util/zlib.h                 |  43 +++++++++++++
 be/test/util/CMakeLists.txt        |   1 +
 be/test/util/http_channel_test.cpp |  58 +++++++++++++++++
 8 files changed, 270 insertions(+), 3 deletions(-)

diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 82d7f6c..7b68370 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -17,17 +17,20 @@
 
 #include "http/http_channel.h"
 
+#include <mutex>
 #include <sstream>
 #include <string>
 
 #include <event2/buffer.h>
 #include <event2/http.h>
 
+#include "gutil/strings/split.h"
 #include "http/http_request.h"
 #include "http/http_response.h"
 #include "http/http_headers.h"
 #include "http/http_status.h"
 #include "common/logging.h"
+#include "util/zlib.h"
 
 namespace doris {
 
@@ -52,7 +55,13 @@ void HttpChannel::send_reply(HttpRequest* request, 
HttpStatus status) {
 void HttpChannel::send_reply(
         HttpRequest* request, HttpStatus status, const std::string& content) {
     auto evb = evbuffer_new();
-    evbuffer_add(evb, content.c_str(), content.size());
+    std::string compressed_content;
+    if (compress_content(request->header(HttpHeaders::ACCEPT_ENCODING), 
content, &compressed_content)) {
+        request->add_output_header(HttpHeaders::CONTENT_ENCODING, "gzip");
+        evbuffer_add(evb, compressed_content.c_str(), 
compressed_content.size());
+    } else {
+        evbuffer_add(evb, content.c_str(), content.size());
+    }
     evhttp_send_reply(request->get_evhttp_request(), status, 
defalut_reason(status).c_str(), evb);
     evbuffer_free(evb);
 }
@@ -66,4 +75,31 @@ void HttpChannel::send_file(HttpRequest* request, int fd, 
size_t off, size_t siz
     evbuffer_free(evb);
 }
 
+bool HttpChannel::compress_content(const std::string& accept_encoding, const 
std::string& input, std::string* output) {
+    // Don't bother compressing empty content.
+    if (input.empty()) {
+        return false;
+    }
+
+    // Check if gzip compression is accepted by the caller. If so, compress the
+    // content and replace the prerendered output.
+    bool is_compressed = false;
+    std::vector<string> encodings = strings::Split(accept_encoding, ",");
+    for (string& encoding : encodings) {
+        StripWhiteSpace(&encoding);
+        if (encoding == "gzip") {
+            std::ostringstream oss;
+            Status s = zlib::CompressLevel(Slice(input), 1, &oss);
+            if (s.ok()) {
+                *output = oss.str();
+                is_compressed = true;
+            } else {
+                LOG(WARNING) << "Could not compress output: " << s.to_string();
+            }
+            break;
+        }
+    }
+    return is_compressed;
+}
+
 }
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index 307dd81..9c7546b 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -46,6 +46,8 @@ public:
     static void send_reply(HttpRequest* request, HttpStatus status, const 
std::string& content);
 
     static void send_file(HttpRequest* request, int fd, size_t off, size_t 
size);
+
+    static bool compress_content(const std::string& accept_encoding, const 
std::string& input, std::string* output);
 };
 
 }
diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp
index cdf1bb4..b6205a6 100644
--- a/be/src/http/http_request.cpp
+++ b/be/src/http/http_request.cpp
@@ -122,9 +122,7 @@ const std::string& HttpRequest::param(const std::string& 
key) const {
 }
 
 void HttpRequest::add_output_header(const char* key, const char* value) {
-// #ifndef BE_TEST
     evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
-// #endif
 }
 
 std::string HttpRequest::get_request_body() {
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index d54a4d2..4c44ddb 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -100,6 +100,7 @@ set(UTIL_FILES
   easy_json.cc
   mustache/mustache.cc
   brpc_stub_cache.cpp
+  zlib.cpp
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/util/zlib.cpp b/be/src/util/zlib.cpp
new file mode 100644
index 0000000..fe0c325
--- /dev/null
+++ b/be/src/util/zlib.cpp
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/zlib.h"
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include <zconf.h>
+#include <zlib.h>
+
+#include "gutil/macros.h"
+#include "gutil/strings/substitute.h"
+
+using std::ostream;
+using std::string;
+using std::unique_ptr;
+
+#define ZRETURN_NOT_OK(call) \
+  RETURN_IF_ERROR(ZlibResultToStatus(call))
+
+namespace doris {
+namespace zlib {
+
+namespace  {
+Status ZlibResultToStatus(int rc) {
+  switch (rc) {
+    case Z_OK:
+      return Status::OK();
+    case Z_STREAM_END:
+      return Status::EndOfFile("zlib EOF");
+    case Z_NEED_DICT:
+      return Status::Corruption("zlib error: NEED_DICT");
+    case Z_ERRNO:
+      return Status::IOError("zlib error: Z_ERRNO");
+    case Z_STREAM_ERROR:
+      return Status::Corruption("zlib error: STREAM_ERROR");
+    case Z_DATA_ERROR:
+      return Status::Corruption("zlib error: DATA_ERROR");
+    case Z_MEM_ERROR:
+      return Status::RuntimeError("zlib error: MEM_ERROR");
+    case Z_BUF_ERROR:
+      return Status::RuntimeError("zlib error: BUF_ERROR");
+    case Z_VERSION_ERROR:
+      return Status::RuntimeError("zlib error: VERSION_ERROR");
+    default:
+      return Status::RuntimeError(
+          strings::Substitute("zlib error: unknown error $0", rc));
+  }
+}
+} // anonymous namespace
+
+Status Compress(Slice input, ostream* out) {
+  return CompressLevel(input, Z_DEFAULT_COMPRESSION, out);
+}
+
+Status CompressLevel(Slice input, int level, ostream* out) {
+  z_stream zs;
+  memset(&zs, 0, sizeof(zs));
+  ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
+                              15 + 16 /* 15 window bits, enable gzip */,
+                              8 /* memory level, max is 9 */,
+                              Z_DEFAULT_STRATEGY));
+
+  zs.avail_in = input.get_size();
+  zs.next_in = (unsigned char*)(input.mutable_data());
+  const int kChunkSize = 256 * 1024;
+  unique_ptr<unsigned char[]> chunk(new unsigned char[kChunkSize]);
+  int flush;
+  do {
+    zs.avail_out = kChunkSize;
+    zs.next_out = chunk.get();
+    flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
+    Status s = ZlibResultToStatus(deflate(&zs, flush));
+    if (!s.ok() && !s.is_end_of_file()) {
+      return s;
+    }
+    int out_size = zs.next_out - chunk.get();
+    if (out_size > 0) {
+      out->write(reinterpret_cast<char *>(chunk.get()), out_size);
+    }
+  } while (flush != Z_FINISH);
+  ZRETURN_NOT_OK(deflateEnd(&zs));
+  return Status::OK();
+}
+
+Status Uncompress(Slice compressed, std::ostream* out) {
+  z_stream zs;
+  memset(&zs, 0, sizeof(zs));
+  zs.next_in = (unsigned char*)(compressed.mutable_data());
+  zs.avail_in = compressed.get_size();
+  ZRETURN_NOT_OK(inflateInit2(&zs, 15 + 16 /* 15 window bits, enable zlib */));
+  int flush;
+  Status s;
+  do {
+    unsigned char buf[4096];
+    zs.next_out = buf;
+    zs.avail_out = arraysize(buf);
+    flush = zs.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH;
+    s = ZlibResultToStatus(inflate(&zs, flush));
+    if (!s.ok() && !s.is_end_of_file()) {
+      return s;
+    }
+    out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
+  } while (flush == Z_NO_FLUSH);
+  ZRETURN_NOT_OK(inflateEnd(&zs));
+
+  return Status::OK();
+}
+
+} // namespace zlib
+} // namespace doris
diff --git a/be/src/util/zlib.h b/be/src/util/zlib.h
new file mode 100644
index 0000000..b062c40
--- /dev/null
+++ b/be/src/util/zlib.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <iosfwd>
+
+#include "util/slice.h"
+#include "common/status.h"
+
+namespace doris {
+namespace zlib {
+
+// Zlib-compress the data in 'input', appending the result to 'out'.
+//
+// In case of an error, some data may still be appended to 'out'.
+Status Compress(Slice input, std::ostream* out);
+
+// The same as the above, but with a custom level (1-9, where 1 is fastest
+// and 9 is best compression).
+Status CompressLevel(Slice input, int level, std::ostream* out);
+
+// Uncompress the zlib-compressed data in 'compressed', appending the result
+// to 'out'.
+//
+// In case of an error, some data may still be appended to 'out'.
+Status Uncompress(Slice compressed, std::ostream* out);
+
+} // namespace zlib
+} // namespace doris
diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt
index 9d419bb..df3c9f8 100644
--- a/be/test/util/CMakeLists.txt
+++ b/be/test/util/CMakeLists.txt
@@ -65,3 +65,4 @@ ADD_BE_TEST(thread_test)
 ADD_BE_TEST(threadpool_test)
 ADD_BE_TEST(trace_test)
 ADD_BE_TEST(easy_json-test)
+ADD_BE_TEST(http_channel_test)
diff --git a/be/test/util/http_channel_test.cpp 
b/be/test/util/http_channel_test.cpp
new file mode 100644
index 0000000..c7b0421
--- /dev/null
+++ b/be/test/util/http_channel_test.cpp
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/http_channel.h"
+
+#include <gtest/gtest.h>
+
+#include "util/zlib.h"
+#include "util/logging.h"
+
+namespace doris {
+
+class HttpChannelTest : public testing::Test {
+public:
+    void check_data_eq(const std::string& output, const std::string& expected) 
{
+        std::ostringstream oss;
+        ASSERT_TRUE(zlib::Uncompress(Slice(output), &oss).ok());
+        ASSERT_EQ(expected, oss.str());
+    }
+};
+
+TEST_F(HttpChannelTest, CompressContent) {
+    ASSERT_FALSE(HttpChannel::compress_content("gzip", "", nullptr));
+    ASSERT_FALSE(HttpChannel::compress_content("", "test", nullptr));
+    ASSERT_FALSE(HttpChannel::compress_content("Gzip", "", nullptr));
+
+    const std::string& intput("test_data_0123456789abcdefg");
+    std::string output;
+
+    ASSERT_TRUE(HttpChannel::compress_content("gzip", intput, &output));
+    ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput));
+
+    ASSERT_TRUE(HttpChannel::compress_content("123,gzip,321", intput, 
&output));
+    ASSERT_NO_FATAL_FAILURE(check_data_eq(output, intput));
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    doris::init_glog("be-test");
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to