wang-jiahua commented on code in PR #10444:
URL: https://github.com/apache/rocketmq/pull/10444#discussion_r3373790937


##########
common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java:
##########
@@ -602,64 +602,373 @@ public static List<MessageExt> decodes(ByteBuffer 
byteBuffer, final boolean read
         return msgExts;
     }
 
+    private static final ThreadLocal<StringBuilder> REUSABLE_SB =
+        ThreadLocal.withInitial(() -> new StringBuilder(256));
+
     public static String messageProperties2String(Map<String, String> 
properties) {
         if (properties == null) {
             return "";
         }
-        int len = 0;
+        StringBuilder sb = REUSABLE_SB.get();
+        sb.setLength(0);
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String name = entry.getKey();
+            final String value = entry.getValue();
+
+            if (value == null) {
+                continue;
+            }
+            sb.append(name);
+            sb.append(NAME_VALUE_SEPARATOR);
+            sb.append(value);
+            sb.append(PROPERTY_SEPARATOR);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * UTF-8 byte serialization of properties, mirror of {@link 
#messageProperties2String} but
+     * skipping the StringBuilder + String + String.getBytes() round-trip on 
the broker write
+     * hot path. Output bytes are identical to {@code 
messageProperties2String(properties).getBytes(UTF_8)}.
+     * Returns null for null/empty maps (encoders treat null and 0-length 
identically).
+     */
+    public static byte[] messageProperties2Bytes(Map<String, String> 
properties) {
+        if (properties == null || properties.isEmpty()) {
+            return null;
+        }
+        int totalLen = 0;
         for (final Map.Entry<String, String> entry : properties.entrySet()) {
             final String name = entry.getKey();
             final String value = entry.getValue();
             if (value == null) {
                 continue;
             }
             if (name != null) {
-                len += name.length();
+                totalLen += utf8ByteLength(name);
             }
-            len += value.length();
-            len += 2; // separator
+            totalLen += utf8ByteLength(value);
+            totalLen += 2;
+        }
+        if (totalLen == 0) {
+            return null;
         }
-        StringBuilder sb = new StringBuilder(len);
+        byte[] out = new byte[totalLen];
+        int idx = 0;
         for (final Map.Entry<String, String> entry : properties.entrySet()) {
             final String name = entry.getKey();
             final String value = entry.getValue();
+            if (value == null) {
+                continue;
+            }
+            if (name != null) {
+                idx = writeUtf8(name, out, idx);
+            }
+            out[idx++] = (byte) NAME_VALUE_SEPARATOR;
+            idx = writeUtf8(value, out, idx);
+            out[idx++] = (byte) PROPERTY_SEPARATOR;
+        }
+        return out;
+    }
 
+    public static int computePropertiesByteLength(Map<String, String> 
properties) {
+        if (properties == null || properties.isEmpty()) {
+            return 0;
+        }
+        if (properties instanceof FlatPropertiesMap) {
+            FlatPropertiesMap fpm = (FlatPropertiesMap) properties;
+            return fpm.computeEncodedLength();
+        }
+        int totalLen = 0;
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String name = entry.getKey();
+            final String value = entry.getValue();
             if (value == null) {
                 continue;
             }
-            sb.append(name);
-            sb.append(NAME_VALUE_SEPARATOR);
-            sb.append(value);
-            sb.append(PROPERTY_SEPARATOR);
+            if (name != null) {
+                totalLen += utf8ByteLength(name);
+            }
+            totalLen += utf8ByteLength(value);
+            totalLen += 2;
         }
-        return sb.toString();
+        return totalLen;
+    }
+
+    public static int messageProperties2Bytes(Map<String, String> properties, 
byte[] out) {
+        if (properties == null || properties.isEmpty()) {
+            return 0;
+        }
+        if (properties instanceof FlatPropertiesMap) {
+            return ((FlatPropertiesMap) properties).encodeTo(out);
+        }
+        int idx = 0;
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String name = entry.getKey();
+            final String value = entry.getValue();
+            if (value == null) {
+                continue;
+            }
+            if (name != null) {
+                idx = writeUtf8(name, out, idx);
+            }
+            out[idx++] = (byte) NAME_VALUE_SEPARATOR;
+            idx = writeUtf8(value, out, idx);
+            out[idx++] = (byte) PROPERTY_SEPARATOR;
+        }
+        return idx;
+    }
+
+    static int utf8ByteLength(String s) {
+        int len = s.length();
+        int byteLen = 0;
+        for (int i = 0; i < len; i++) {
+            char c = s.charAt(i);
+            if (c < 0x80) {
+                byteLen++;
+            } else if (c < 0x800) {
+                byteLen += 2;
+            } else if (Character.isHighSurrogate(c) && i + 1 < len
+                && Character.isLowSurrogate(s.charAt(i + 1))) {
+                byteLen += 4;
+                i++;
+            } else {
+                byteLen += 3;
+            }
+        }
+        return byteLen;
+    }
+
+    static int writeUtf8(String s, byte[] out, int offset) {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            char c = s.charAt(i);
+            if (c < 0x80) {
+                out[offset++] = (byte) c;
+            } else if (c < 0x800) {
+                out[offset++] = (byte) (0xC0 | (c >>> 6));
+                out[offset++] = (byte) (0x80 | (c & 0x3F));
+            } else if (Character.isHighSurrogate(c) && i + 1 < len
+                && Character.isLowSurrogate(s.charAt(i + 1))) {
+                int cp = Character.toCodePoint(c, s.charAt(++i));
+                out[offset++] = (byte) (0xF0 | (cp >>> 18));
+                out[offset++] = (byte) (0x80 | ((cp >>> 12) & 0x3F));
+                out[offset++] = (byte) (0x80 | ((cp >>> 6) & 0x3F));
+                out[offset++] = (byte) (0x80 | (cp & 0x3F));
+            } else {
+                out[offset++] = (byte) (0xE0 | (c >>> 12));
+                out[offset++] = (byte) (0x80 | ((c >>> 6) & 0x3F));
+                out[offset++] = (byte) (0x80 | (c & 0x3F));
+            }
+        }
+        return offset;
     }
 
     public static Map<String, String> string2messageProperties(final String 
properties) {
-        Map<String, String> map = new HashMap<>(128);
-        if (properties != null) {
-            int len = properties.length();
-            int index = 0;
-            while (index < len) {
-                int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index);
-                if (newIndex < 0) {
-                    newIndex = len;
-                }
-                if (newIndex - index >= 3) {
-                    int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, 
index);
-                    if (kvSepIndex > index && kvSepIndex < newIndex - 1) {
-                        String k = properties.substring(index, kvSepIndex);
-                        String v = properties.substring(kvSepIndex + 1, 
newIndex);
-                        map.put(k, v);
+        return string2messageProperties(properties, 0);
+    }
+
+    /**
+     * Variant of {@link #string2messageProperties(String)} that reserves 
capacity for
+     * {@code extraEntries} additional entries the caller intends to put 
afterwards. Used
+     * on the broker send path where MSG_REGION/TRACE_SWITCH/CLUSTER/... are 
appended to
+     * the decoded Map; pre-sizing avoids a HashMap resize when those puts 
cross the load
+     * factor threshold of the as-decoded capacity.
+     */
+    public static Map<String, String> string2messageProperties(final String 
properties, final int extraEntries) {
+        if (properties == null || properties.isEmpty()) {
+            return new HashMap<>(Math.max(4, extraEntries));
+        }
+        final int len = properties.length();
+        int estEntries = 0;
+        for (int i = 0; i < len; i++) {
+            if (properties.charAt(i) == PROPERTY_SEPARATOR) {
+                estEntries++;
+            }
+        }
+        estEntries = Math.max(estEntries, 1);
+        HashMap<String, String> map = new HashMap<>((estEntries + 
extraEntries) * 4 / 3 + 1);
+        int index = 0;
+        while (index < len) {
+            int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index);
+            if (newIndex < 0) {
+                newIndex = len;
+            }
+            if (newIndex - index >= 3) {
+                int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, 
index);
+                if (kvSepIndex > index && kvSepIndex < newIndex - 1) {
+                    int klen = kvSepIndex - index;
+                    String k = null;
+                    if (klen < MessageConst.STRING_INTERN_BY_LEN.length) {
+                        String[] candidates = 
MessageConst.STRING_INTERN_BY_LEN[klen];
+                        if (candidates != null) {
+                            for (String candidate : candidates) {
+                                if (properties.regionMatches(index, candidate, 
0, klen)) {
+                                    k = candidate;
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    if (k == null) {
+                        k = properties.substring(index, kvSepIndex);
                     }
+                    int vOffset = kvSepIndex + 1;
+                    int vLen = newIndex - vOffset;
+                    String v = internStringValue(properties, vOffset, vLen);
+                    if (v == null) {
+                        v = properties.substring(vOffset, newIndex);
+                    }
+                    map.put(k, v);
                 }
-                index = newIndex + 1;
             }
+            index = newIndex + 1;
         }
 
         return map;
     }
 
+    /**
+     * Variant of {@link #string2messageProperties(String)} that parses 
directly from a UTF-8
+     * byte array, skipping the intermediate {@code new String(bytes, ...)} 
allocation. The
+     * separators {@link #NAME_VALUE_SEPARATOR} (0x01) and {@link 
#PROPERTY_SEPARATOR} (0x02)
+     * are ASCII single bytes that never appear inside multi-byte UTF-8 
sequences, so byte-level
+     * scanning is safe. Canonical (intern) keys are ASCII and matched 
byte-by-byte.
+     */
+    public static Map<String, String> bytes2messageProperties(final byte[] 
bytes, final int offset,
+        final int length) {
+        if (bytes == null || length <= 0) {
+            return new HashMap<>(4);
+        }
+        final int end = offset + length;
+        FlatPropertiesMap reusable = REUSABLE_PROPS_MAP.get();
+        reusable.reset();
+        FlatPropertiesMap map = reusable;
+        int index = offset;
+        while (index < end) {
+            int sepIdx = end;
+            for (int i = index; i < end; i++) {
+                if (bytes[i] == PROPERTY_SEPARATOR) {
+                    sepIdx = i;
+                    break;
+                }
+            }
+            if (sepIdx - index >= 3) {
+                int kvSepIdx = -1;
+                for (int i = index; i < sepIdx; i++) {
+                    if (bytes[i] == NAME_VALUE_SEPARATOR) {
+                        kvSepIdx = i;
+                        break;
+                    }
+                }
+                if (kvSepIdx > index && kvSepIdx < sepIdx - 1) {
+                    int klen = kvSepIdx - index;
+                    String k = null;
+                    if (klen < MessageConst.STRING_INTERN_BY_LEN.length) {
+                        String[] candidates = 
MessageConst.STRING_INTERN_BY_LEN[klen];
+                        if (candidates != null) {
+                            for (String candidate : candidates) {
+                                if (asciiBytesEqual(bytes, index, candidate, 
klen)) {
+                                    k = candidate;
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    if (k == null) {
+                        k = new String(bytes, index, klen, CHARSET_UTF8);
+                    }
+                    int vOffset = kvSepIdx + 1;
+                    int vLen = sepIdx - kvSepIdx - 1;
+                    String v = internValue(bytes, vOffset, vLen);
+                    if (v == null) {
+                        v = new String(bytes, vOffset, vLen, CHARSET_UTF8);
+                    }
+                    map.putDirect(k, v);
+                }
+            }
+            index = sepIdx + 1;
+        }
+        return map;
+    }

Review Comment:
   Fixed: `bytes2messageProperties` now returns a freshly allocated `HashMap` 
(pre-sized from the property-separator count). The `REUSABLE_PROPS_MAP` 
ThreadLocal and the whole `FlatPropertiesMap` class have been removed, so 
consecutive decodes on the same thread can no longer corrupt each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to