reiabreu commented on code in PR #8707:
URL: https://github.com/apache/storm/pull/8707#discussion_r3307112670


##########
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java:
##########
@@ -16,27 +16,61 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.task.GeneralTopologyContext;
 import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 
 public class KryoTupleDeserializer implements ITupleDeserializer {
-    private GeneralTopologyContext context;
-    private KryoValuesDeserializer kryo;
-    private SerializationFactory.IdDictionary ids;
-    private Input kryoInput;
+    private static final Integer DEFAULT_MAX_DECOMPRESSED_BYTES = 10 * 1024 * 
1024; // 10MBytes
+    public static final String FAILED_TO_DESERIALIZE_TUPLE = "Failed to 
deserialize tuple";
+    private final GeneralTopologyContext context;
+    private final KryoValuesDeserializer kryo;
+    private final SerializationFactory.IdDictionary ids;
+    private final Input kryoInput;
+    private final int maxZstdDecompressedBytes;
+    private final boolean anyTupleCompressionEnabled;
 
     public KryoTupleDeserializer(final Map<String, Object> conf, final 
GeneralTopologyContext context) {
         kryo = new KryoValuesDeserializer(conf);
         this.context = context;
         ids = new SerializationFactory.IdDictionary(context.getRawTopology());
         kryoInput = new Input(1);
+        maxZstdDecompressedBytes = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_MAX_DECOMPRESSED_BYTES),
+                DEFAULT_MAX_DECOMPRESSED_BYTES);
+        anyTupleCompressionEnabled = isTupleCompressionEnabled(conf, context);
     }
 
     @Override
     public TupleImpl deserialize(byte[] ser) {
+        // check zstd header if at least one component is compressing tuples.
+        if (anyTupleCompressionEnabled && Utils.ZstdUtils.isZstd(ser)) {
+            try {
+                byte[] decompressed = Utils.ZstdUtils.decompress(ser, 
this.maxZstdDecompressedBytes);
+                return deserializeTuple(decompressed);
+            } catch (RuntimeException e) {
+                if (e.getMessage() != null && 
e.getMessage().contains(FAILED_TO_DESERIALIZE_TUPLE)) {
+                    // isZstd() false positive: a raw Kryo tuple's first 4 
bytes matched ZSTD_MAGIC_HEADER by chance.
+                    // This is astronomically unlikely in practice. Because 
ZSTD_MAGIC_HEADER (0xFD2FB528) is little-endian
+                    // on the wire, the first byte checked is 0x28. A Kryo 
writeInt(taskId, true) of 40 yields exactly 0x28.
+                    // The collision is prevented not by the taskId range, but 
by the second field (streamId),
+                    // which would rigidly have to equal 6069 to match the 
remaining magic bytes.
+                    // Branch retained for correctness in case of an 
accidental collision.

Review Comment:
   do we want a log message here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to