rzo1 commented on code in PR #8653:
URL: https://github.com/apache/storm/pull/8653#discussion_r3228573599


##########
pom.xml:
##########
@@ -514,6 +515,12 @@
                 <artifactId>commons-compress</artifactId>
                 <version>${commons-compress.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>${zstd-jni.version}</version>
+                <scope>compile</scope>

Review Comment:
   `compile` is the default Maven scope — this line is redundant and can be 
dropped.
   
   Also worth checking whether the explicit `zstd-jni` dependency is needed at 
all: `commons-compress` already pulls it transitively, and the new code only 
uses `org.apache.commons.compress.compressors.zstandard.*`. If nothing in the 
codebase imports `com.github.luben.zstd.*` directly, this whole `<dependency>` 
entry plus the `zstd-jni.version` property at the top of the file can go.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java:
##########
@@ -0,0 +1,65 @@
+/**

Review Comment:
   Minor: rest of this package uses `/*` for the ASF license header; this file 
uses `/**` (Javadoc style).



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Always writes Zstd out, but tests incoming bytes to determine the format.
+ * If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
+ * If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
+ */
+public class ZstdBridgeThriftSerializationDelegate implements 
SerializationDelegate {
+
+    /**
+     * Zstandard magic number 0xFD2FB52. In a byte array (little-endian 
format): [0x28, 0xB5, 0x2F, 0xFD]
+     */
+    private static final byte[] ZSTD_MAGIC = new byte[]{ 0x28, (byte) 0xB5, 
0x2F, (byte) 0xFD };
+
+    private final ThriftSerializationDelegate defaultDelegate = new 
ThriftSerializationDelegate();

Review Comment:
   This needs to be `GzipThriftSerializationDelegate`, not 
`ThriftSerializationDelegate`. The shipping default for 
`storm.meta.serialization.delegate` has always been Gzip-compressed Thrift, so 
on a non-Zstd magic header the bytes will be Gzip — never raw Thrift. As 
written, this bridge cannot read any state produced by previous releases, 
defeating the purpose of having a bridge.
   
   With `GzipThriftSerializationDelegate` as the fallback here, and the bridge 
promoted to the new default in `defaults.yaml`, rolling upgrades work 
transparently in both directions.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TDeserializer;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Note, this assumes it's deserializing a gzip byte stream, and will err if 
it encounters any other serialization.
+ */
+public class ZstdThriftSerializationDelegate implements SerializationDelegate {
+
+    // ThreadLocal with explicit exception handling for checked 
TTransportException
+    private static final ThreadLocal<TSerializer> SERIALIZER = 
ThreadLocal.withInitial(() -> {

Review Comment:
   Caching `TSerializer`/`TDeserializer` per thread is a behaviour change worth 
reconsidering — every other delegate in this package allocates them per call. 
Thrift's `TMemoryBuffer` inside `TSerializer` grows to the largest message ever 
serialized on that thread and **never shrinks**, so on long-lived executor 
threads, one occasional large metadata blob permanently pins that memory across 
every worker. Standard `ThreadLocal` classloader-pinning concerns apply on 
worker shutdown too.
   
   Suggest following the existing per-call pattern; `TSerializer` is cheap to 
allocate compared to compression. If the cache is kept, at minimum add a 
lifecycle hook that calls `SERIALIZER.remove()` / `DESERIALIZER.remove()` on 
shutdown.



##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
         }
     }
 
+    /**
+     * Static utility class for Zstandard (Zstd) compression and decompression.
+     */
+    public static final class ZstdUtils {
+
+        private static final int BUFFER_SIZE = 64 * 1024;
+
+        /**
+         * Private constructor to prevent instantiation.
+         * @throws UnsupportedOperationException if an attempt is made to 
instantiate this class.
+         */
+        private ZstdUtils() {
+            throw new UnsupportedOperationException("Utility class should not 
be instantiated.");
+        }
+
+        /**
+         * Compresses the provided byte array using Zstandard.
+         *
+         * <p>The output includes the standard Zstandard frame header, making 
it
+         * self-describing for the decompression phase.</p>
+         *
+         * @param data the raw byte array to compress.
+         * @return a compressed byte array, or the original array if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
compression fails.
+         */
+        public static byte[] compress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayOutputStream bos = new 
ByteArrayOutputStream(data.length)) {
+                try (ZstdCompressorOutputStream zstdOut = 
ZstdCompressorOutputStream.builder()
+                        .setOutputStream(bos)
+                        .setBufferSize(BUFFER_SIZE) // impacts on compression 
ratio

Review Comment:
   Reaching into the static `Utils.localConf` from a public utility class makes 
`ZstdUtils.compress` impossible to test in isolation and effectively pins the 
compression level at JVM startup. The level is per-delegate config — 
`SerializationDelegate#prepare(topoConf)` is the natural place to read it once 
and cache it. Suggest plumbing the level through as a parameter, e.g. 
`compress(byte[] data, int level)`, and having 
`ZstdThriftSerializationDelegate.prepare` stash the value after looking it up.



##########
storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java:
##########
@@ -803,6 +803,26 @@ public void validateField(String name, Object o) {
         }
     }
 
+    public static class ZstdLevelValidator extends Validator {
+        private static final int MIN_LEVEL = 1;
+        private static final int MAX_LEVEL = 22;

Review Comment:
   Levels 20–22 are Zstd "ultra" mode and require dramatically more working 
memory per call — rarely worth it for metadata-sized payloads, and they make 
the cluster easy to footgun with a single `storm.yaml` typo. Suggest capping at 
19, or gating ultra levels behind an explicit opt-in (e.g. 
`storm.compression.zstd.allow.ultra`).



##########
storm-client/test/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegateTest.java:
##########
@@ -0,0 +1,141 @@
+/**

Review Comment:
   A few coverage gaps worth filling while this file is fresh:
   
   1. No direct test of `ZstdThriftSerializationDelegate` — it is only 
exercised via the bridge.
   2. No test that **Gzip-encoded** bytes deserialize correctly through the 
bridge — which is the whole point of having a bridge, and as currently 
implemented this scenario is broken (see comment on `defaultDelegate` in 
`ZstdBridgeThriftSerializationDelegate.java`).
   3. No negative test for malformed bytes (random data with no recognizable 
framing).
   4. Once a decompression cap is added, please add a bomb-style test (small 
input, very large declared expansion).



##########
storm-client/pom.xml:
##########
@@ -135,6 +135,15 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+            <scope>compile</scope>

Review Comment:
   Same as in the root pom — `compile` is the default scope, drop the `<scope>` 
line. And if no module-level code imports `com.github.luben.zstd.*` directly, 
the whole `<dependency>` entry is redundant with the `commons-compress` one 
right above it.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import java.util.zip.GZIPInputStream;

Review Comment:
   Unused import. Its presence suggests the original intent was to fall back to 
Gzip rather than raw Thrift — see comment on the `defaultDelegate` field below.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdBridgeThriftSerializationDelegate.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Always writes Zstd out, but tests incoming bytes to determine the format.
+ * If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
+ * If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
+ */
+public class ZstdBridgeThriftSerializationDelegate implements 
SerializationDelegate {
+
+    /**
+     * Zstandard magic number 0xFD2FB52. In a byte array (little-endian 
format): [0x28, 0xB5, 0x2F, 0xFD]

Review Comment:
   The Zstandard magic number is `0xFD2FB528` (32 bits / 8 hex digits), not 
`0xFD2FB52`. The bytes themselves below are correct; just the comment is 
missing a digit.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TDeserializer;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Note, this assumes it's deserializing a gzip byte stream, and will err if 
it encounters any other serialization.

Review Comment:
   Stale copy from `GzipThriftSerializationDelegate`: this delegate handles 
Zstd, not "a gzip byte stream".



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TDeserializer;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Note, this assumes it's deserializing a gzip byte stream, and will err if 
it encounters any other serialization.
+ */
+public class ZstdThriftSerializationDelegate implements SerializationDelegate {
+
+    // ThreadLocal with explicit exception handling for checked 
TTransportException
+    private static final ThreadLocal<TSerializer> SERIALIZER = 
ThreadLocal.withInitial(() -> {
+        try {
+            return new TSerializer();
+        } catch (TTransportException e) {
+            throw new RuntimeException("Failed to initialize Thrift 
Serializer", e);
+        }
+    });
+
+    private static final ThreadLocal<TDeserializer> DESERIALIZER = 
ThreadLocal.withInitial(() -> {
+        try {
+            return new TDeserializer();
+        } catch (TTransportException e) {
+            throw new RuntimeException("Failed to initialize Thrift 
Deserializer", e);
+        }
+    });
+
+    @Override
+    public void prepare(Map<String, Object> topoConf) {
+        // No-op: Initialization happens lazily per thread
+    }
+
+    @Override
+    public byte[] serialize(Object object) {
+        if (!(object instanceof TBase)) {
+            throw new IllegalArgumentException("Object must be an instance of 
TBase");
+        }
+        try {
+            byte[] thriftData = SERIALIZER.get().serialize((TBase<?, ?>) 
object);
+            return Utils.ZstdUtils.compress(thriftData);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to serialize Thrift object", e);
+        }
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
+        try {
+            byte[] decompressed = Utils.ZstdUtils.decompress(bytes);
+            TBase<?, ?> instance = (TBase<?, ?>) 
clazz.getDeclaredConstructor().newInstance();

Review Comment:
   Nit / defense-in-depth: 
`clazz.asSubclass(TBase.class).getDeclaredConstructor().newInstance()` fails 
fast on a wrong class type rather than after construction. Not a security issue 
under Storm's threat model — just a small robustness improvement.



##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
         }
     }
 
+    /**
+     * Static utility class for Zstandard (Zstd) compression and decompression.
+     */
+    public static final class ZstdUtils {
+
+        private static final int BUFFER_SIZE = 64 * 1024;
+
+        /**
+         * Private constructor to prevent instantiation.
+         * @throws UnsupportedOperationException if an attempt is made to 
instantiate this class.
+         */
+        private ZstdUtils() {
+            throw new UnsupportedOperationException("Utility class should not 
be instantiated.");
+        }
+
+        /**
+         * Compresses the provided byte array using Zstandard.
+         *
+         * <p>The output includes the standard Zstandard frame header, making 
it
+         * self-describing for the decompression phase.</p>
+         *
+         * @param data the raw byte array to compress.
+         * @return a compressed byte array, or the original array if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
compression fails.
+         */
+        public static byte[] compress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayOutputStream bos = new 
ByteArrayOutputStream(data.length)) {
+                try (ZstdCompressorOutputStream zstdOut = 
ZstdCompressorOutputStream.builder()
+                        .setOutputStream(bos)
+                        .setBufferSize(BUFFER_SIZE) // impacts on compression 
ratio
+                        .setLevel(ConfigUtils.zstdCompressionLevel(localConf))
+                        .get()) {
+                    zstdOut.write(data);
+                    zstdOut.finish();
+                }
+                return bos.toByteArray();
+            } catch (Exception e) {
+                throw new RuntimeException("Zstd compression failed", e);
+            }
+        }
+
+        /**
+         * Decompresses a Zstandard-compressed byte array.
+         *
+         * @param data the compressed byte array (Zstd frame).
+         * @return the original decompressed byte array, or the input if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
decompression fails
+         *                          or if the data is not a valid Zstd frame.
+         */
+        public static byte[] decompress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+                 ZstdCompressorInputStream zstdIn = new 
ZstdCompressorInputStream(bis);
+                 ByteArrayOutputStream bos = new ByteArrayOutputStream()) {

Review Comment:
   Hardening suggestion — not a vulnerability under Storm's threat model since 
ZK sits inside the trusted boundary, but worth doing as defense-in-depth: this 
`IOUtils.copy` will happily decompress an unbounded amount of data into memory. 
Zstd routinely achieves >100× ratios and pathological frames can reach 
>1,000,000×, so a small crafted frame can OOM the JVM. Suggest a configurable 
cap (e.g. `storm.compression.zstd.max.decompressed.bytes`, default in the tens 
of MiB — well above any realistic metadata size) enforced via 
`BoundedInputStream` or a manual copy loop. The same caveat applies to 
`Utils.gunzip` in principle, but Zstd raises the ceiling significantly.



##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
         }
     }
 
+    /**
+     * Static utility class for Zstandard (Zstd) compression and decompression.
+     */
+    public static final class ZstdUtils {
+
+        private static final int BUFFER_SIZE = 64 * 1024;
+
+        /**
+         * Private constructor to prevent instantiation.
+         * @throws UnsupportedOperationException if an attempt is made to 
instantiate this class.
+         */
+        private ZstdUtils() {
+            throw new UnsupportedOperationException("Utility class should not 
be instantiated.");
+        }
+
+        /**
+         * Compresses the provided byte array using Zstandard.
+         *
+         * <p>The output includes the standard Zstandard frame header, making 
it
+         * self-describing for the decompression phase.</p>
+         *
+         * @param data the raw byte array to compress.
+         * @return a compressed byte array, or the original array if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
compression fails.
+         */
+        public static byte[] compress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayOutputStream bos = new 
ByteArrayOutputStream(data.length)) {
+                try (ZstdCompressorOutputStream zstdOut = 
ZstdCompressorOutputStream.builder()
+                        .setOutputStream(bos)
+                        .setBufferSize(BUFFER_SIZE) // impacts on compression 
ratio
+                        .setLevel(ConfigUtils.zstdCompressionLevel(localConf))
+                        .get()) {
+                    zstdOut.write(data);
+                    zstdOut.finish();
+                }
+                return bos.toByteArray();
+            } catch (Exception e) {
+                throw new RuntimeException("Zstd compression failed", e);
+            }
+        }
+
+        /**
+         * Decompresses a Zstandard-compressed byte array.
+         *
+         * @param data the compressed byte array (Zstd frame).
+         * @return the original decompressed byte array, or the input if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
decompression fails
+         *                          or if the data is not a valid Zstd frame.
+         */
+        public static byte[] decompress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+                 ZstdCompressorInputStream zstdIn = new 
ZstdCompressorInputStream(bis);
+                 ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+                IOUtils.copy(zstdIn, bos);
+                return bos.toByteArray();

Review Comment:
   Same as the compress path: narrow to `IOException`, otherwise 
`InterruptedException` is swallowed.



##########
conf/defaults.yaml:
##########
@@ -54,7 +54,8 @@ storm.nimbus.zookeeper.acls.fixup: true
 
 storm.auth.simple-white-list.users: []
 storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
-storm.meta.serialization.delegate: 
"org.apache.storm.serialization.GzipThriftSerializationDelegate"
+storm.meta.serialization.delegate: 
"org.apache.storm.serialization.ZstdThriftSerializationDelegate"

Review Comment:
   Switching the *default* to the pure Zstd delegate breaks rolling upgrades: 
every existing cluster has Gzip-compressed Thrift sitting in ZooKeeper, and 
`ZstdThriftSerializationDelegate` cannot read it. The new bridge class is 
presumably designed for exactly this case — once its fallback is fixed (see 
comment on that file), please point the default here at 
`ZstdBridgeThriftSerializationDelegate` instead. Operators can opt into the 
pure Zstd delegate once they are sure no legacy Gzip state remains in ZK.



##########
storm-client/src/jvm/org/apache/storm/serialization/ZstdThriftSerializationDelegate.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.serialization;
+
+import java.util.Map;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TDeserializer;
+import org.apache.storm.thrift.TException;
+import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Note, this assumes it's deserializing a gzip byte stream, and will err if 
it encounters any other serialization.
+ */
+public class ZstdThriftSerializationDelegate implements SerializationDelegate {
+
+    // ThreadLocal with explicit exception handling for checked 
TTransportException
+    private static final ThreadLocal<TSerializer> SERIALIZER = 
ThreadLocal.withInitial(() -> {
+        try {
+            return new TSerializer();
+        } catch (TTransportException e) {
+            throw new RuntimeException("Failed to initialize Thrift 
Serializer", e);
+        }
+    });
+
+    private static final ThreadLocal<TDeserializer> DESERIALIZER = 
ThreadLocal.withInitial(() -> {
+        try {
+            return new TDeserializer();
+        } catch (TTransportException e) {
+            throw new RuntimeException("Failed to initialize Thrift 
Deserializer", e);
+        }
+    });
+
+    @Override
+    public void prepare(Map<String, Object> topoConf) {
+        // No-op: Initialization happens lazily per thread
+    }
+
+    @Override
+    public byte[] serialize(Object object) {
+        if (!(object instanceof TBase)) {
+            throw new IllegalArgumentException("Object must be an instance of 
TBase");
+        }
+        try {
+            byte[] thriftData = SERIALIZER.get().serialize((TBase<?, ?>) 
object);
+            return Utils.ZstdUtils.compress(thriftData);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to serialize Thrift object", e);
+        }
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
+        try {
+            byte[] decompressed = Utils.ZstdUtils.decompress(bytes);
+            TBase<?, ?> instance = (TBase<?, ?>) 
clazz.getDeclaredConstructor().newInstance();
+            DESERIALIZER.get().deserialize(instance, decompressed);
+            return (T) instance;
+        } catch (Exception e) {

Review Comment:
   `catch (Exception)` here also swallows `InterruptedException`. Suggest 
narrowing to the checked exceptions actually thrown 
(`ReflectiveOperationException`, `TException`) so an interrupt isn't silently 
converted into a `RuntimeException` with the interrupt flag dropped.



##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
         }
     }
 
+    /**
+     * Static utility class for Zstandard (Zstd) compression and decompression.
+     */
+    public static final class ZstdUtils {
+
+        private static final int BUFFER_SIZE = 64 * 1024;
+
+        /**
+         * Private constructor to prevent instantiation.
+         * @throws UnsupportedOperationException if an attempt is made to 
instantiate this class.
+         */
+        private ZstdUtils() {
+            throw new UnsupportedOperationException("Utility class should not 
be instantiated.");
+        }
+
+        /**
+         * Compresses the provided byte array using Zstandard.
+         *
+         * <p>The output includes the standard Zstandard frame header, making 
it
+         * self-describing for the decompression phase.</p>
+         *
+         * @param data the raw byte array to compress.
+         * @return a compressed byte array, or the original array if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
compression fails.
+         */
+        public static byte[] compress(byte[] data) {
+            if (data == null || data.length == 0) {
+                return data;
+            }
+
+            try (ByteArrayOutputStream bos = new 
ByteArrayOutputStream(data.length)) {
+                try (ZstdCompressorOutputStream zstdOut = 
ZstdCompressorOutputStream.builder()
+                        .setOutputStream(bos)
+                        .setBufferSize(BUFFER_SIZE) // impacts on compression 
ratio
+                        .setLevel(ConfigUtils.zstdCompressionLevel(localConf))
+                        .get()) {
+                    zstdOut.write(data);
+                    zstdOut.finish();
+                }
+                return bos.toByteArray();

Review Comment:
   Narrow this to `IOException` — the underlying streams only declare that, and 
`catch (Exception)` swallows `InterruptedException` if the calling thread is 
interrupted mid-write.



##########
storm-client/src/jvm/org/apache/storm/utils/Utils.java:
##########
@@ -960,6 +963,75 @@ public static byte[] gunzip(byte[] data) {
         }
     }
 
+    /**
+     * Static utility class for Zstandard (Zstd) compression and decompression.
+     */
+    public static final class ZstdUtils {
+
+        private static final int BUFFER_SIZE = 64 * 1024;
+
+        /**
+         * Private constructor to prevent instantiation.
+         * @throws UnsupportedOperationException if an attempt is made to 
instantiate this class.
+         */
+        private ZstdUtils() {
+            throw new UnsupportedOperationException("Utility class should not 
be instantiated.");
+        }
+
+        /**
+         * Compresses the provided byte array using Zstandard.
+         *
+         * <p>The output includes the standard Zstandard frame header, making 
it
+         * self-describing for the decompression phase.</p>
+         *
+         * @param data the raw byte array to compress.
+         * @return a compressed byte array, or the original array if 
null/empty.
+         * @throws RuntimeException wrapping an {@link IOException} if the 
compression fails.
+         */
+        public static byte[] compress(byte[] data) {
+            if (data == null || data.length == 0) {

Review Comment:
   Returning the raw input for empty data means the bridge's `isZstd()` check 
sees no magic header and routes through the fallback path. Works by accident 
for empty input today, but it's an asymmetry that will bite later. Either drop 
the early-return so output always carries a valid Zstd frame, or document the 
contract explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to