This is an automated email from the ASF dual-hosted git repository.
thiru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new f59db4906 AVRO-4111: [C++] Replace boost::iostreams with zlib library
(#3290)
f59db4906 is described below
commit f59db4906b40f7bee4e4a7a80824e43c55cecabb
Author: Gang Wu <[email protected]>
AuthorDate: Fri Jan 17 01:01:12 2025 +0800
AVRO-4111: [C++] Replace boost::iostreams with zlib library (#3290)
* AVRO-4111: [C++] Replace boost::iostreams with zlib library
* declare buf as uint8_t
* fix lint
* remove unused cmake variables
---
.github/workflows/test-lang-c++-ARM.yml | 2 +-
.github/workflows/test-lang-c++.yml | 2 +-
lang/c++/CMakeLists.txt | 17 +++-
lang/c++/impl/DataFile.cc | 160 ++++++++++++++++++++------------
lang/c++/include/avro/DataFile.hh | 3 -
5 files changed, 114 insertions(+), 70 deletions(-)
diff --git a/.github/workflows/test-lang-c++-ARM.yml
b/.github/workflows/test-lang-c++-ARM.yml
index f101eaeb2..759065b08 100644
--- a/.github/workflows/test-lang-c++-ARM.yml
+++ b/.github/workflows/test-lang-c++-ARM.yml
@@ -44,7 +44,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update -q
- sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev cmake
+ sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev
zlib1g-dev cmake
- name: Build
run: |
diff --git a/.github/workflows/test-lang-c++.yml
b/.github/workflows/test-lang-c++.yml
index 61afa7ff6..c0c66ceec 100644
--- a/.github/workflows/test-lang-c++.yml
+++ b/.github/workflows/test-lang-c++.yml
@@ -39,7 +39,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install Dependencies
- run: sudo apt update && sudo apt-get install -qqy cppcheck
libboost-all-dev libsnappy-dev libfmt-dev cmake
+ run: sudo apt update && sudo apt-get install -qqy cppcheck
libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
- name: Print Versions
run: |
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index e6f70bffd..1fc19f90a 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -110,6 +110,13 @@ else (SNAPPY_FOUND)
message("Disabled snappy codec. libsnappy not found.")
endif (SNAPPY_FOUND)
+find_package(ZLIB REQUIRED)
+if (ZLIB_FOUND)
+ message("Enabled zlib codec")
+else (ZLIB_FOUND)
+ message(FATAL_ERROR "ZLIB is not found")
+endif (ZLIB_FOUND)
+
add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
add_definitions
(-DAVRO_VERSION="${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}")
@@ -140,8 +147,8 @@ set_property (TARGET avrocpp
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK)
add_library (avrocpp_s STATIC ${AVRO_SOURCE_FILES})
-target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR})
-target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}
fmt::fmt-header-only)
+target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR}
${ZLIB_INCLUDE_DIR})
+target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}
${ZLIB_LIBRARIES} fmt::fmt-header-only)
set_property (TARGET avrocpp avrocpp_s
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE)
@@ -152,8 +159,8 @@ set_target_properties (avrocpp PROPERTIES
set_target_properties (avrocpp_s PROPERTIES
VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH})
-target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}
fmt::fmt-header-only)
-target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
+target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}
${ZLIB_LIBRARIES} fmt::fmt-header-only)
+target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR}
${ZLIB_INCLUDE_DIR})
target_include_directories(avrocpp PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@@ -209,7 +216,7 @@ if (AVRO_BUILD_TESTS)
macro (unittest name)
add_executable (${name} test/${name}.cc)
- target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES}
${SNAPPY_LIBRARIES})
+ target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES}
${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES})
add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
endmacro (unittest)
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index 63ea7df20..fb0eaa2ac 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -23,15 +23,12 @@
#include <random>
#include <sstream>
-#include <boost/crc.hpp> // for boost::crc_32_type
-#include <boost/iostreams/device/file.hpp>
-#include <boost/iostreams/filter/gzip.hpp>
-#include <boost/iostreams/filter/zlib.hpp>
-
#ifdef SNAPPY_CODEC_AVAILABLE
#include <snappy.h>
#endif
+#include <zlib.h>
+
namespace avro {
using std::copy;
using std::istringstream;
@@ -55,12 +52,8 @@ const string AVRO_SNAPPY_CODEC = "snappy";
const size_t minSyncInterval = 32;
const size_t maxSyncInterval = 1u << 30;
-boost::iostreams::zlib_params get_zlib_params() {
- boost::iostreams::zlib_params ret;
- ret.method = boost::iostreams::zlib::deflated;
- ret.noheader = true;
- return ret;
-}
+// Recommended by https://www.zlib.net/zlib_how.html
+const size_t zlibBufGrowSize = 128 * 1024;
} // namespace
@@ -144,21 +137,45 @@ void DataFileWriterBase::sync() {
std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
copy(*in, *stream_);
} else if (codec_ == DEFLATE_CODEC) {
- std::vector<char> buf;
+ std::vector<uint8_t> buf;
{
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
- os.push(boost::iostreams::back_inserter(buf));
- const uint8_t *data;
- size_t len;
+ z_stream zs;
+ zs.zalloc = Z_NULL;
+ zs.zfree = Z_NULL;
+ zs.opaque = Z_NULL;
+
+ int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
-15, 8, Z_DEFAULT_STRATEGY);
+ if (ret != Z_OK) {
+ throw Exception("Failed to initialize deflate, error: {}",
ret);
+ }
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
- while (input->next(&data, &len)) {
- boost::iostreams::write(os, reinterpret_cast<const char
*>(data), len);
+ const uint8_t *data;
+ size_t len;
+ while (ret != Z_STREAM_END && input->next(&data, &len)) {
+ zs.avail_in = static_cast<uInt>(len);
+ zs.next_in = const_cast<Bytef *>(data);
+ bool flush = (zs.total_in + len) >= buffer_->byteCount();
+ do {
+ if (zs.total_out == buf.size()) {
+ buf.resize(buf.size() + zlibBufGrowSize);
+ }
+ zs.avail_out = static_cast<uInt>(buf.size() -
zs.total_out);
+ zs.next_out = buf.data() + zs.total_out;
+ ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
+ if (ret == Z_STREAM_END) {
+ break;
+ }
+ if (ret != Z_OK) {
+ throw Exception("Failed to deflate, error: {}", ret);
+ }
+ } while (zs.avail_out == 0);
}
+
+ buf.resize(zs.total_out);
+ (void) deflateEnd(&zs);
} // make sure all is flushed
- std::unique_ptr<InputStream> in = memoryInputStream(
- reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
+ std::unique_ptr<InputStream> in = memoryInputStream(buf.data(),
buf.size());
int64_t byteCount = buf.size();
avro::encode(*encoderPtr_, byteCount);
encoderPtr_->flush();
@@ -167,35 +184,28 @@ void DataFileWriterBase::sync() {
} else if (codec_ == SNAPPY_CODEC) {
std::vector<char> temp;
std::string compressed;
- boost::crc_32_type crc;
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::back_inserter(temp));
- const uint8_t *data;
- size_t len;
- std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
- while (input->next(&data, &len)) {
- boost::iostreams::write(os, reinterpret_cast<const char
*>(data),
- len);
- }
- } // make sure all is flushed
+ const uint8_t *data;
+ size_t len;
+ std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
+ while (input->next(&data, &len)) {
+ temp.insert(temp.end(), reinterpret_cast<const char *>(data),
+ reinterpret_cast<const char *>(data) + len);
+ }
- crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
- temp.size());
// For Snappy, add the CRC32 checksum
- auto checksum = crc();
+ auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
+ static_cast<uInt>(temp.size()));
// Now compress
size_t compressed_size = snappy::Compress(
reinterpret_cast<const char *>(temp.data()), temp.size(),
&compressed);
+
temp.clear();
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::back_inserter(temp));
- boost::iostreams::write(os, compressed.c_str(), compressed_size);
- }
+ temp.insert(temp.end(), compressed.c_str(),
+ compressed.c_str() + compressed_size);
+
temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
@@ -285,8 +295,7 @@ void DataFileReaderBase::init(const ValidSchema
&readerSchema) {
static void drain(InputStream &in) {
const uint8_t *p = nullptr;
size_t n = 0;
- while (in.next(&p, &n))
- ;
+ while (in.next(&p, &n));
}
char hex(unsigned int x) {
@@ -384,7 +393,6 @@ void DataFileReaderBase::readDataBlock() {
dataStream_ = std::move(st);
#ifdef SNAPPY_CODEC_AVAILABLE
} else if (codec_ == SNAPPY_CODEC) {
- boost::crc_32_type crc;
uint32_t checksum = 0;
compressed_.clear();
uncompressed.clear();
@@ -408,35 +416,67 @@ void DataFileReaderBase::readDataBlock() {
throw Exception(
"Snappy Compression reported an error when decompressing");
}
- crc.process_bytes(uncompressed.c_str(), uncompressed.size());
- auto c = crc();
+ auto c = crc32(0, reinterpret_cast<const Bytef
*>(uncompressed.c_str()),
+ static_cast<uInt>(uncompressed.size()));
if (checksum != c) {
throw Exception(
"Checksum did not match for Snappy compression: Expected: {},
computed: {}",
checksum, c);
}
- os_.reset(new boost::iostreams::filtering_istream());
- os_->push(
- boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
- uncompressed.size()));
- std::unique_ptr<InputStream> in = istreamInputStream(*os_);
+
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
+ uncompressed.size());
dataDecoder_->init(*in);
dataStream_ = std::move(in);
#endif
} else {
compressed_.clear();
- const uint8_t *data;
- size_t len;
- while (st->next(&data, &len)) {
- compressed_.insert(compressed_.end(), data, data + len);
+ uncompressed.clear();
+
+ {
+ z_stream zs;
+ zs.zalloc = Z_NULL;
+ zs.zfree = Z_NULL;
+ zs.opaque = Z_NULL;
+ zs.avail_in = 0;
+ zs.next_in = Z_NULL;
+
+ int ret = inflateInit2(&zs, /*windowBits=*/-15);
+ if (ret != Z_OK) {
+ throw Exception("Failed to initialize inflate, error: {}",
ret);
+ }
+
+ const uint8_t *data;
+ size_t len;
+ while (ret != Z_STREAM_END && st->next(&data, &len)) {
+ zs.avail_in = static_cast<uInt>(len);
+ zs.next_in = const_cast<Bytef *>(data);
+ do {
+ if (zs.total_out == uncompressed.size()) {
+ uncompressed.resize(uncompressed.size() +
zlibBufGrowSize);
+ }
+ zs.avail_out = static_cast<uInt>(uncompressed.size() -
zs.total_out);
+ zs.next_out = reinterpret_cast<Bytef
*>(uncompressed.data() + zs.total_out);
+ ret = inflate(&zs, Z_NO_FLUSH);
+ if (ret == Z_STREAM_END) {
+ break;
+ }
+ if (ret != Z_OK) {
+ throw Exception("Failed to inflate, error: {}", ret);
+ }
+ } while (zs.avail_out == 0);
+ }
+
+ uncompressed.resize(zs.total_out);
+ (void) inflateEnd(&zs);
}
- os_.reset(new boost::iostreams::filtering_istream());
- os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
- os_->push(boost::iostreams::basic_array_source<char>(
- compressed_.data(), compressed_.size()));
- std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
+ std::unique_ptr<InputStream> in = memoryInputStream(
+ reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
+ uncompressed.size());
+
dataDecoder_->init(*in);
dataStream_ = std::move(in);
}
diff --git a/lang/c++/include/avro/DataFile.hh
b/lang/c++/include/avro/DataFile.hh
index dcfddf774..6b2cdab5c 100644
--- a/lang/c++/include/avro/DataFile.hh
+++ b/lang/c++/include/avro/DataFile.hh
@@ -31,8 +31,6 @@
#include <string>
#include <vector>
-#include <boost/iostreams/filtering_stream.hpp>
-
namespace avro {
/** Specify type of compression to use when writing data files. */
@@ -216,7 +214,6 @@ class AVRO_DECL DataFileReaderBase {
DataFileSync sync_{};
// for compressed buffer
- std::unique_ptr<boost::iostreams::filtering_istream> os_;
std::vector<char> compressed_;
std::string uncompressed;
void readHeader();