This is an automated email from the ASF dual-hosted git repository.

thiru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 54b3321615 AVRO-4172: [C++] Fix ZSTD codec compatibility (#3457)
54b3321615 is described below

commit 54b332161524086dcb6cde8afe097097eed7f3ee
Author: Zhang Jiawei <[email protected]>
AuthorDate: Thu Oct 16 00:56:21 2025 +0800

    AVRO-4172: [C++] Fix ZSTD codec compatibility (#3457)
    
    * AVRO-4172: [C++] Fix ZSTD codec compatibility
    
    * AVRO-4172: [C++] add codec compatibility test
    
    * fix
    
    * fix
    
    * fix
    
    * add ZstdCodecWrapper for compression
    
    * fix
    
    * fix
    
    * split zstd compress and decompress wrapper
    
    * fix
---
 lang/c++/CMakeLists.txt                |   2 +
 lang/c++/impl/DataFile.cc              |  50 ++++----------------
 lang/c++/impl/ZstdCompressWrapper.cc   |  62 ++++++++++++++++++++++++
 lang/c++/impl/ZstdCompressWrapper.hh   |  45 ++++++++++++++++++
 lang/c++/impl/ZstdDecompressWrapper.cc |  78 ++++++++++++++++++++++++++++++
 lang/c++/impl/ZstdDecompressWrapper.hh |  46 ++++++++++++++++++
 lang/c++/test/DataFileTests.cc         |  84 +++++++++++++++++++++++++++++++++
 share/test/data/weather-deflate.avro   | Bin 0 -> 319 bytes
 share/test/data/weather-zstd.avro      | Bin 0 -> 333 bytes
 9 files changed, 326 insertions(+), 41 deletions(-)

diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index 07eefe0b26..03f4dfb8ab 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -129,6 +129,8 @@ set (AVRO_SOURCE_FILES
         impl/Stream.cc impl/FileStream.cc
         impl/Generic.cc impl/GenericDatum.cc
         impl/DataFile.cc
+        impl/ZstdCompressWrapper.cc
+        impl/ZstdDecompressWrapper.cc
         impl/parsing/Symbol.cc
         impl/parsing/ValidatingCodec.cc
         impl/parsing/JsonCodec.cc
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index 5bf27c1ab7..e605a6f448 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -28,7 +28,8 @@
 #endif
 
 #ifdef ZSTD_CODEC_AVAILABLE
-#include <zstd.h>
+#include "ZstdCompressWrapper.hh"
+#include "ZstdDecompressWrapper.hh"
 #endif
 
 #include <zlib.h>
@@ -244,24 +245,12 @@ void DataFileWriterBase::sync() {
                                 reinterpret_cast<const char *>(data) + len);
         }
 
-        // Pre-allocate buffer for compressed data
-        size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
-        std::vector<char> compressed(max_compressed_size);
+        ZstdCompressWrapper zstdCompressWrapper;
+        std::vector<char> compressed = 
zstdCompressWrapper.compress(uncompressed);
 
-        // Compress the data using ZSTD block API
-        size_t compressed_size = ZSTD_compress(
-            compressed.data(), max_compressed_size,
-            uncompressed.data(), uncompressed.size(),
-            ZSTD_CLEVEL_DEFAULT);
-
-        if (ZSTD_isError(compressed_size)) {
-            throw Exception("ZSTD compression error: {}", 
ZSTD_getErrorName(compressed_size));
-        }
-
-        compressed.resize(compressed_size);
         std::unique_ptr<InputStream> in = memoryInputStream(
             reinterpret_cast<const uint8_t *>(compressed.data()), 
compressed.size());
-        avro::encode(*encoderPtr_, static_cast<int64_t>(compressed_size));
+        avro::encode(*encoderPtr_, static_cast<int64_t>(compressed.size()));
         encoderPtr_->flush();
         copy(*in, *stream_);
 #endif
@@ -482,35 +471,15 @@ void DataFileReaderBase::readDataBlock() {
 #ifdef ZSTD_CODEC_AVAILABLE
     } else if (codec_ == ZSTD_CODEC) {
         compressed_.clear();
+        uncompressed.clear();
         const uint8_t *data;
         size_t len;
         while (st->next(&data, &len)) {
             compressed_.insert(compressed_.end(), data, data + len);
         }
 
-        // Get the decompressed size
-        size_t decompressed_size = ZSTD_getFrameContentSize(
-            reinterpret_cast<const char *>(compressed_.data()), 
compressed_.size());
-        if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
-            throw Exception("ZSTD: Not a valid compressed frame");
-        } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
-            throw Exception("ZSTD: Unable to determine decompressed size");
-        }
-
-        // Decompress the data
-        uncompressed.clear();
-        uncompressed.resize(decompressed_size);
-        size_t result = ZSTD_decompress(
-            uncompressed.data(), decompressed_size,
-            reinterpret_cast<const char *>(compressed_.data()), 
compressed_.size());
-
-        if (ZSTD_isError(result)) {
-            throw Exception("ZSTD decompression error: {}", 
ZSTD_getErrorName(result));
-        }
-        if (result != decompressed_size) {
-            throw Exception("ZSTD: Decompressed size mismatch: expected {}, 
got {}",
-                            decompressed_size, result);
-        }
+        ZstdDecompressWrapper zstdDecompressWrapper;
+        uncompressed = zstdDecompressWrapper.decompress(compressed_);
 
         std::unique_ptr<InputStream> in = memoryInputStream(
             reinterpret_cast<const uint8_t *>(uncompressed.data()),
@@ -620,8 +589,7 @@ void DataFileReaderBase::readHeader() {
         codec_ = SNAPPY_CODEC;
 #endif
 #ifdef ZSTD_CODEC_AVAILABLE
-    } else if (it != metadata_.end()
-               && toString(it->second) == AVRO_ZSTD_CODEC) {
+    } else if (it != metadata_.end() && toString(it->second) == 
AVRO_ZSTD_CODEC) {
         codec_ = ZSTD_CODEC;
 #endif
     } else {
diff --git a/lang/c++/impl/ZstdCompressWrapper.cc 
b/lang/c++/impl/ZstdCompressWrapper.cc
new file mode 100644
index 0000000000..b756e19d8e
--- /dev/null
+++ b/lang/c++/impl/ZstdCompressWrapper.cc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include "ZstdCompressWrapper.hh"
+#include "Exception.hh"
+
+#include <zstd.h>
+
+namespace avro {
+
+std::vector<char> ZstdCompressWrapper::compress(const std::vector<char> 
&uncompressed) {
+    // Pre-allocate buffer for compressed data
+    size_t max_compressed_size = ZSTD_compressBound(uncompressed.size());
+    if (ZSTD_isError(max_compressed_size)) {
+        throw Exception("ZSTD compression error: {}", 
ZSTD_getErrorName(max_compressed_size));
+    }
+    std::vector<char> compressed(max_compressed_size);
+
+    // Compress the data using ZSTD block API
+    size_t compressed_size = ZSTD_compress(
+        compressed.data(), max_compressed_size,
+        uncompressed.data(), uncompressed.size(),
+        ZSTD_CLEVEL_DEFAULT);
+
+    if (ZSTD_isError(compressed_size)) {
+        throw Exception("ZSTD compression error: {}", 
ZSTD_getErrorName(compressed_size));
+    }
+    compressed.resize(compressed_size);
+    return compressed;
+}
+
+ZstdCompressWrapper::ZstdCompressWrapper() {
+    cctx_ = ZSTD_createCCtx();
+    if (!cctx_) {
+        throw Exception("ZSTD_createCCtx() failed");
+    }
+}
+
+ZstdCompressWrapper::~ZstdCompressWrapper() {
+    ZSTD_freeCCtx(cctx_);
+}
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
diff --git a/lang/c++/impl/ZstdCompressWrapper.hh 
b/lang/c++/impl/ZstdCompressWrapper.hh
new file mode 100644
index 0000000000..871dd711a6
--- /dev/null
+++ b/lang/c++/impl/ZstdCompressWrapper.hh
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_ZstdCompressWrapper_hh__
+#define avro_ZstdCompressWrapper_hh__
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include <vector>
+
+#include <zstd.h>
+
+namespace avro {
+
+class ZstdCompressWrapper {
+public:
+    ZstdCompressWrapper();
+    ~ZstdCompressWrapper();
+
+    std::vector<char> compress(const std::vector<char> &uncompressed);
+
+private:
+    ZSTD_CCtx *cctx_ = nullptr;
+};
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
+
+#endif // avro_ZstdCompressWrapper_hh__
diff --git a/lang/c++/impl/ZstdDecompressWrapper.cc 
b/lang/c++/impl/ZstdDecompressWrapper.cc
new file mode 100644
index 0000000000..86d996dd31
--- /dev/null
+++ b/lang/c++/impl/ZstdDecompressWrapper.cc
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include "ZstdDecompressWrapper.hh"
+#include "Exception.hh"
+
+#include <zstd.h>
+
+namespace avro {
+
+std::string ZstdDecompressWrapper::decompress(const std::vector<char> 
&compressed) {
+    std::string uncompressed;
+    // Get the decompressed size
+    size_t decompressed_size = ZSTD_getFrameContentSize(compressed.data(), 
compressed.size());
+    if (decompressed_size == ZSTD_CONTENTSIZE_ERROR) {
+        throw Exception("ZSTD: Not a valid compressed frame");
+    } else if (decompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
+        // Stream decompress the data
+        ZSTD_inBuffer in{compressed.data(), compressed.size(), 0};
+        std::vector<char> tmp(ZSTD_DStreamOutSize());
+        ZSTD_outBuffer out{tmp.data(), tmp.size(), 0};
+        size_t ret;
+        do {
+            out.pos = 0;
+            ret = ZSTD_decompressStream(dctx_, &out, &in);
+            if (ZSTD_isError(ret)) {
+                throw Exception("ZSTD decompression error: {}", 
ZSTD_getErrorName(ret));
+            }
+            uncompressed.append(tmp.data(), out.pos);
+        } while (ret != 0);
+    } else {
+        // Batch decompress the data
+        uncompressed.resize(decompressed_size);
+        size_t result = ZSTD_decompress(
+            uncompressed.data(), decompressed_size, compressed.data(), 
compressed.size());
+
+        if (ZSTD_isError(result)) {
+            throw Exception("ZSTD decompression error: {}", 
ZSTD_getErrorName(result));
+        }
+        if (result != decompressed_size) {
+            throw Exception("ZSTD: Decompressed size mismatch: expected {}, 
got {}",
+                            decompressed_size, result);
+        }
+    }
+    return uncompressed;
+}
+
+ZstdDecompressWrapper::ZstdDecompressWrapper() {
+    dctx_ = ZSTD_createDCtx();
+    if (!dctx_) {
+        throw Exception("ZSTD_createDCtx() failed");
+    }
+}
+
+ZstdDecompressWrapper::~ZstdDecompressWrapper() {
+    ZSTD_freeDCtx(dctx_);
+}
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
diff --git a/lang/c++/impl/ZstdDecompressWrapper.hh 
b/lang/c++/impl/ZstdDecompressWrapper.hh
new file mode 100644
index 0000000000..b5b97758c4
--- /dev/null
+++ b/lang/c++/impl/ZstdDecompressWrapper.hh
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_ZstdDecompressWrapper_hh__
+#define avro_ZstdDecompressWrapper_hh__
+
+#ifdef ZSTD_CODEC_AVAILABLE
+
+#include <string>
+#include <vector>
+
+#include <zstd.h>
+
+namespace avro {
+
+class ZstdDecompressWrapper {
+public:
+    ZstdDecompressWrapper();
+    ~ZstdDecompressWrapper();
+
+    std::string decompress(const std::vector<char> &compressed);
+
+private:
+    ZSTD_DCtx *dctx_ = nullptr;
+};
+
+} // namespace avro
+
+#endif // ZSTD_CODEC_AVAILABLE
+
+#endif // avro_ZstdDecompressWrapper_hh__
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 3723615fb8..3a44970c55 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -892,6 +892,81 @@ void testSkipStringZstdCodec() {
 }
 #endif
 
+struct Weather {
+    std::string station;
+    int64_t time;
+    int32_t temp;
+    Weather(const char *station, int64_t time, int32_t temp)
+        : station(station), time(time), temp(temp) {}
+
+    bool operator==(const Weather &other) const {
+        return station == other.station && time == other.time && temp == 
other.temp;
+    }
+    friend std::ostream &operator<<(std::ostream &os, const Weather &w) {
+        return os << w.station << ' ' << w.time << ' ' << w.temp;
+    }
+};
+
+namespace avro {
+template<>
+struct codec_traits<Weather> {
+    static void decode(Decoder &d, Weather &v) {
+        avro::decode(d, v.station);
+        avro::decode(d, v.time);
+        avro::decode(d, v.temp);
+    }
+};
+} // namespace avro
+
+void testCompatibility(const char *filename) {
+    const char *readerSchemaStr = "{"
+                                  "\"type\": \"record\", \"name\": 
\"test.Weather\", \"fields\":["
+                                  "{\"name\": \"station\", \"type\": 
\"string\", \"order\": \"ignore\"},"
+                                  "{\"name\": \"time\", \"type\": \"long\"},"
+                                  "{\"name\": \"temp\", \"type\": \"int\"}"
+                                  "]}";
+    avro::ValidSchema readerSchema =
+        avro::compileJsonSchemaFromString(readerSchemaStr);
+    avro::DataFileReader<Weather> df(filename, readerSchema);
+
+    Weather ro("", -1, -1);
+    BOOST_CHECK_EQUAL(df.read(ro), true);
+    BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619524000000L, 0));
+    BOOST_CHECK_EQUAL(df.read(ro), true);
+    BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619506000000L, 22));
+    BOOST_CHECK_EQUAL(df.read(ro), true);
+    BOOST_CHECK_EQUAL(ro, Weather("011990-99999", -619484400000L, -11));
+    BOOST_CHECK_EQUAL(df.read(ro), true);
+    BOOST_CHECK_EQUAL(ro, Weather("012650-99999", -655531200000L, 111));
+    BOOST_CHECK_EQUAL(df.read(ro), true);
+    BOOST_CHECK_EQUAL(ro, Weather("012650-99999", -655509600000L, 78));
+    BOOST_CHECK_EQUAL(df.read(ro), false);
+}
+
+void testCompatibilityNullCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testCompatibility("../../share/test/data/weather.avro");
+}
+
+void testCompatibilityDeflateCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testCompatibility("../../share/test/data/weather-deflate.avro");
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testCompatibilitySnappyCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testCompatibility("../../share/test/data/weather-snappy.avro");
+}
+#endif
+
+#ifdef ZSTD_CODEC_AVAILABLE
+void testCompatibilityZstdCodec() {
+    BOOST_TEST_CHECKPOINT(__func__);
+    testCompatibility("../../share/test/data/weather-zstd.avro");
+}
+#endif
+
 struct TestRecord {
     std::string s1;
     int64_t id;
@@ -1376,6 +1451,15 @@ init_unit_test_suite(int, char *[]) {
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testSkipStringZstdCodec));
 #endif
 
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityNullCodec));
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilitySnappyCodec));
+#endif
+#ifdef ZSTD_CODEC_AVAILABLE
+    
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testCompatibilityZstdCodec));
+#endif
+
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncNullCodec));
     
boost::unit_test::framework::master_test_suite().add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
 #ifdef SNAPPY_CODEC_AVAILABLE
diff --git a/share/test/data/weather-deflate.avro 
b/share/test/data/weather-deflate.avro
new file mode 100644
index 0000000000..b07db86b21
Binary files /dev/null and b/share/test/data/weather-deflate.avro differ
diff --git a/share/test/data/weather-zstd.avro 
b/share/test/data/weather-zstd.avro
new file mode 100644
index 0000000000..6a66a34a23
Binary files /dev/null and b/share/test/data/weather-zstd.avro differ

Reply via email to