GGraziadei commented on code in PR #8653:
URL: https://github.com/apache/storm/pull/8653#discussion_r3234764270
##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
}
}
+ /**
+ * Static utility class for Zstandard (Zstd) compression and decompression.
+ */
+ public static final class ZstdUtils {
+
+ private static final int BUFFER_SIZE = 64 * 1024;
+
+ /**
+ * Private constructor to prevent instantiation.
+ * @throws UnsupportedOperationException if an attempt is made to
instantiate this class.
+ */
+ private ZstdUtils() {
+ throw new UnsupportedOperationException("Utility class should not
be instantiated.");
+ }
+
+ /**
+ * Compresses the provided byte array using Zstandard.
+ *
+ * <p>The output includes the standard Zstandard frame header, making
it
+ * self-describing for the decompression phase.</p>
+ *
+ * @param data the raw byte array to compress.
+ * @return a compressed byte array, or the original array if
null/empty.
+ * @throws RuntimeException wrapping an {@link IOException} if the
compression fails.
+ */
+ public static byte[] compress(byte[] data) {
+ if (data == null || data.length == 0) {
+ return data;
+ }
+
+ try (ByteArrayOutputStream bos = new
ByteArrayOutputStream(data.length)) {
+ try (ZstdCompressorOutputStream zstdOut =
ZstdCompressorOutputStream.builder()
+ .setOutputStream(bos)
+ .setBufferSize(BUFFER_SIZE) // impacts on compression
ratio
+ .setLevel(ConfigUtils.zstdCompressionLevel(localConf))
+ .get()) {
+ zstdOut.write(data);
+ zstdOut.finish();
+ }
+ return bos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Zstd compression failed", e);
+ }
+ }
+
+ /**
+ * Decompresses a Zstandard-compressed byte array.
+ *
+ * @param data the compressed byte array (Zstd frame).
+ * @return the original decompressed byte array, or the input if
null/empty.
+ * @throws RuntimeException wrapping an {@link IOException} if the
decompression fails
+ * or if the data is not a valid Zstd frame.
+ */
+ public static byte[] decompress(byte[] data) {
+ if (data == null || data.length == 0) {
+ return data;
+ }
+
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ ZstdCompressorInputStream zstdIn = new
ZstdCompressorInputStream(bis);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
Review Comment:
Thank you. I am fixing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]