CherishCai opened a new issue, #2253: URL: https://github.com/apache/fury/issues/2253
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/fury/issues) and found no similar issues. ### Version Java fury 0.10.2 ```xml <!-- 采用 jdk8 --> [bolt-common-error.log](https://github.com/user-attachments/files/20453090/bolt-common-error.log) [stable-6.2.130.60-bolt-common-error.log](https://github.com/user-attachments/files/20453089/stable-6.2.130.60-bolt-common-error.log) <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>sofa-rpc-all</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>bolt</artifactId> <version>1.6.10</version> </dependency> <dependency> <groupId>org.apache.fury</groupId> <artifactId>fury-core</artifactId> <version>0.10.2</version> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.30.2-GA</version> </dependency> ``` ### Component(s) Java ### Minimal reproduce step - SofaResponse ```java package com.alipay.sofa.rpc.core.response; import com.alipay.sofa.rpc.transport.AbstractByteBuf; import java.io.Serializable; import java.util.HashMap; import java.util.Map; /** * Sofa RPC Response class * * @author <a href=mailto:hongwei....@antfin.com>HongWei Yi</a> */ public final class SofaResponse implements Serializable { private static final long serialVersionUID = -4364536436151723421L; /** * 框架异常 */ private boolean isError = false; /** * 框架异常的消息 */ private String errorMsg; /** * 业务返回或者业务异常 */ private Object appResponse; /** * extensional properties */ private Map<String, String> responseProps; //====================== 下面是非传递属性 =============== /** * 序列化类型 */ private transient byte serializeType; /** * 数据 */ private transient AbstractByteBuf data; /** * Gets app response. * * @return the app response */ public Object getAppResponse() { return appResponse; } /** * Sets app response. * * @param response the response */ public void setAppResponse(Object response) { appResponse = response; } /** * Is error boolean. * * @return the boolean */ public boolean isError() { return isError; } /** * Gets error msg. * * @return the error msg */ public String getErrorMsg() { return errorMsg; } /** * Sets error msg. * * @param error the error */ public void setErrorMsg(String error) { if (error == null) { return; } errorMsg = error; isError = true; } /** * Gets response prop. * * @param key the key * @return the response prop */ public Object getResponseProp(String key) { return responseProps == null ? null : responseProps.get(key); } /** * Add response prop. * * @param key the key * @param value the value */ public void addResponseProp(String key, String value) { if (responseProps == null) { responseProps = new HashMap<String, String>(16); } if (key != null && value != null) { responseProps.put(key, value); } } /** * Remove response props. * * @param key the key */ public void removeResponseProp(String key) { if (responseProps != null && key != null) { responseProps.remove(key); } } /** * Gets response props. * * @return the response props */ public Map<String, String> getResponseProps() { return responseProps; } /** * Sets response props. * * @param responseProps the response props */ public void setResponseProps(Map<String, String> responseProps) { this.responseProps = responseProps; } /** * Gets serialize type. * * @return the serialize type */ public byte getSerializeType() { return serializeType; } /** * Sets serialize type. * * @param serializeType the serialize type * @return the serialize type */ public SofaResponse setSerializeType(byte serializeType) { this.serializeType = serializeType; return this; } /** * Gets data. * * @return the data */ public AbstractByteBuf getData() { return data; } /** * Sets data. * * @param data the data * @return the data */ public SofaResponse setData(AbstractByteBuf data) { this.data = data; return this; } @Override public String toString() { StringBuilder sb = new StringBuilder(128); sb.append("SofaResponse["); sb.append("sofa-rpc exception=").append(isError).append(", "); sb.append("sofa-rpc errorMsg=").append(errorMsg).append(", "); sb.append("appResponse=").append(appResponse).append("]"); return sb.toString(); } } ``` - PollingResponse ```java public class PollingResponse implements Serializable { public static int ERROR_CODE_NOT_READY_FOR_SERVICE = 1; public static int ERROR_CODE_SERVER_REJECT = 2; private List<String> nameList; private List<VipDomain> vipDomains; private Map<String, Object> extensionParams; private long startTime; private long acceptTime; private int errorCode = 0; private String errorMsg; public PollingResponse() { super(); } public void serialExtensionListToString() { if (null == extensionParams || extensionParams.isEmpty()) { extensionParams = new HashMap<String, Object>(); return; } for (Entry<String, Object> entry : extensionParams.entrySet()) { if (entry.getValue() instanceof List<?>) { extensionParams.put(entry.getKey(), StringUtils.join((List<String>) entry.getValue(), ",")); } } } public List<String> getNameList() { return nameList; } public void setNameList(List<String> nameList) { this.nameList = nameList; } public List<VipDomain> getVipDomains() { return vipDomains; } public void setVipDomains(List<VipDomain> vipDomains) { this.vipDomains = vipDomains; } public Map<String, Object> getExtensionParams() { return extensionParams; } public void setExtensionParams(Map<String, Object> extensionParams) { this.extensionParams = extensionParams; } public long getStartTime() { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } public long getAcceptTime() { return acceptTime; } public void setAcceptTime(long acceptTime) { this.acceptTime = acceptTime; } public long getTransmissionTime() { return this.acceptTime - this.startTime; } public int getErrorCode() { return this.errorCode; } public void setErrorCode(int errorCode) { this.errorCode = errorCode; } public String getErrorMsg() { return this.errorMsg; } public void setErrorMsg(String errorMsg) { this.errorMsg = errorMsg; } public boolean isSuccess() { return this.errorCode == 0; } public void checkSuccess() throws AntVipResponseException { if (!this.isSuccess()) { throw new AntVipResponseException(this.getPrettyErrorMsg()); } } public String getPrettyErrorMsg() { return this.errorMsg + "(errorCode:" + this.errorCode + ")"; } @Override public String toString() { return String.format( "PollingResponse [errorCode=%s, errorMsg=%s, nameList'size=%s, vipDomains'size=%s, startTime=%s, acceptTime=%s, extensionParams=%s]", this.errorCode, this.errorMsg, (this.nameList != null ? this.nameList.size() : null), (this.vipDomains != null ? this.vipDomains.size() : null), this.startTime, this.acceptTime, this.extensionParams); } } ``` - VipDomain ```java public class VipDomain implements Serializable { private String name; private transient String app; private transient String zone; private transient String station; private transient String env; private int protectThreshold = 50; private int idcDisasterProtect; private transient boolean enable = false; private HealthCheckType healthCheckType = HealthCheckType.TCP; private int healthCheckDefaultPort = 8080; private int healthCheckTimeout = 2000; private int healthCheckInterval = 5000; private int healthCheckRaise = 1; private int healthCheckFall = 3; private Map<String, Object> healthCheckPayload; private Boolean healthCheckEnable; private long version = 1; private transient Date lastAccessTime; private transient String checksumForClient; private transient String checksumForClientNewVersion; private transient String checksumForClientCompatible; private transient String checksumForServer; private List<RealNode> realNodes; private Map<String, String> domainLabels; private Map<String, Object> span; public void addSpan(String spanKey, Object spanVal) { if (span == null) { this.span = new ConcurrentHashMap<>(); } this.span.put(spanKey, spanVal); } @Override public String toString() { return "VipDomain [name=" + name + ", app=" + app + ", zone=" + zone + ", station=" + station + ", env=" + env + ", protectThreshold=" + protectThreshold + ", idcDisasterProtect=" + idcDisasterProtect + ", enable=" + enable + ", healthCheckType=" + healthCheckType + ", healthCheckDefaultPort=" + healthCheckDefaultPort + ", healthCheckTimeout=" + healthCheckTimeout + ", healthCheckInterval=" + healthCheckInterval + ", healthCheckRaise=" + healthCheckRaise + ", healthCheckFall=" + healthCheckFall + ", healthCheckPayload=" + healthCheckPayload + ", healthCheckEnable=" + healthCheckEnable + ", version=" + version + ", lastAccessTime=" + lastAccessTime + ", checksumForClient=" + checksumForClient + ", checksumForClientNewVersion=" + checksumForClientNewVersion + ", checksumForClientCompatible=" + checksumForClientCompatible + ", checksumForServer=" + checksumForServer + ", realNodes=" + realNodes + ", domainLabels=" + domainLabels + ", span=" + span + "]"; } } ``` - RealNode ```java public class RealNode implements Serializable { private transient String domainName; private String ip; private transient String fqdn; private int weight = 1; private Integer healthCheckPort = 0; private transient boolean enable = false; private String zone; private volatile Boolean available; private long roundTripTime = -1; private String reason; private Date lastHealthCheckTime; private transient AtomicInteger raisingCount = new AtomicInteger(0); private transient AtomicInteger fallingCount = new AtomicInteger(0); private transient VipDomain vipDomain; private Map<String, String> rnLabels; public boolean isFalling() { return this.fallingCount.get() > 0; } public boolean isRaising() { return this.raisingCount.get() > 0; } public String getEffectiveHealtchCheckHost() { return this.ip + ":" + this.getEffectiveHealthCheckPort(); } public int getEffectiveHealthCheckPort() { return this.healthCheckPort <= 0 ? this.getVipDomain().getHealthCheckDefaultPort() : this.healthCheckPort; } // FIXME 此处不应该打印 vipDomain,否则会循环打印,报错 java.lang.StackOverflowError @Override public String toString() { return "RealNode [domainName=" + domainName + ", ip=" + ip + ", fqdn=" + fqdn + ", weight=" + weight + ", healthCheckPort=" + healthCheckPort + ", enable=" + enable + ", zone=" + zone + ", available=" + available + ", roundTripTime=" + roundTripTime + ", reason=" + reason + ", lastHealthCheckTime=" + lastHealthCheckTime + ", raisingCount=" + raisingCount + ", fallingCount=" + fallingCount + ", vipDomain=" + vipDomain + ", rnLabels=" + rnLabels + "]"; } } ``` - Fury010Serializer ```java import com.alipay.antvip.server.sofarpc.fury.serialize.SofaRequestFurySerializer; import com.alipay.antvip.server.sofarpc.fury.serialize.SofaResponseFurySerializer; import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.codec.AbstractSerializer; import com.alipay.sofa.rpc.codec.CustomSerializer; import com.alipay.sofa.rpc.codec.common.BlackAndWhiteListFileLoader; import com.alipay.sofa.rpc.codec.common.SerializeCheckStatus; import com.alipay.sofa.rpc.common.config.RpcConfigKeys; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; import com.alipay.sofa.rpc.ext.Extension; import com.alipay.sofa.rpc.transport.AbstractByteBuf; import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf; import org.apache.fury.Fury; import org.apache.fury.ThreadLocalFury; import org.apache.fury.ThreadSafeFury; import org.apache.fury.config.Language; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.resolver.AllowListChecker; import java.util.List; import java.util.Map; import static org.apache.fury.config.CompatibleMode.COMPATIBLE; /** * Fury010Serializer. * copy from sofa-rpc-all 5.13.3 and change `io.fury.*` to `org.apache.fury.*` * <p> * <li>自定义序列化 apache fury 0.10.0 版本,后续再升级 sofa-rpc 集成 apache fury 1.0(有 Hessian 作回切然后再切 fury 1.0 版本);</li> * <li>修复 sofa-rpc 一个问题,它会清理掉 Fury 造成下次 ThreadLocalFury 又要构造 Fury 对象; fury.clearClassLoader(contextClassLoader); </li> * </p> * * @link <a href="https://yuque.antfin.com/middleware/antvip/eol3rkzd5sho2d4g#jPagx">AntVip 优化节点间的数据同步序列化: sofa-rpc 序列化 Fury</a> * @link resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.codec.Serializer * @see com.alipay.sofa.rpc.codec.fury.FurySerializer */ @Extension(value = "fury010", code = 122) public class Fury010Serializer extends AbstractSerializer { protected final ThreadSafeFury fury; private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE); public Fury010Serializer() { fury = new ThreadLocalFury(classLoader -> { Fury f = Fury.builder().withLanguage(Language.JAVA) .withRefTracking(true) .withCodegen(true) .withNumberCompressed(true) .withCompatibleMode(COMPATIBLE) .requireClassRegistration(false) .withClassLoader(classLoader) .withAsyncCompilation(true) .build(); // Do not use any configuration if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.DISABLE.name())) { AllowListChecker noChecker = new AllowListChecker(AllowListChecker.CheckLevel.DISABLE); f.getClassResolver().setClassChecker(noChecker); return f; } else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.WARN.name())) { AllowListChecker blackListChecker = new AllowListChecker(AllowListChecker.CheckLevel.WARN); List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST; // To setting checker f.getClassResolver().setClassChecker(blackListChecker); blackListChecker.addListener(f.getClassResolver()); // BlackList classes use wildcards for (String key : blackList) { blackListChecker.disallowClass(key + "*"); } } else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.STRICT.name())) { AllowListChecker blackAndWhiteListChecker = new AllowListChecker(AllowListChecker.CheckLevel.STRICT); List<String> whiteList = BlackAndWhiteListFileLoader.SOFA_SERIALIZER_WHITE_LIST; // To setting checker f.getClassResolver().setClassChecker(blackAndWhiteListChecker); blackAndWhiteListChecker.addListener(f.getClassResolver()); // WhiteList classes use wildcards for (String key : whiteList) { blackAndWhiteListChecker.allowClass(key + "*"); } List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST; // To setting checker f.getClassResolver().setClassChecker(blackAndWhiteListChecker); blackAndWhiteListChecker.addListener(f.getClassResolver()); // BlackList classes use wildcards for (String key : blackList) { blackAndWhiteListChecker.disallowClass(key + "*"); } } f.register(SofaRequest.class); f.register(SofaResponse.class); f.register(SofaRpcException.class); f.register(com.alipay.antvip.common.transport.PollingRequest.class); f.register(com.alipay.antvip.common.transport.PollingResponse.class); f.register(com.alipay.antvip.common.model.VipDomain.class); f.register(com.alipay.antvip.common.model.RealNode.class); f.register(com.alipay.antvip.common.model.HealthCheckType.class); return f; }); addCustomSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury)); addCustomSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury)); } @Override public AbstractByteBuf encode(final Object object, final Map<String, String> context) throws SofaRpcException { if (object == null) { throw buildSerializeError("Unsupported null message!"); } ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { fury.setClassLoader(contextClassLoader); CustomSerializer customSerializer = getObjCustomSerializer(object); if (customSerializer != null) { return customSerializer.encodeObject(object, context); } else { MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32); writeBuffer.writerIndex(0); fury.serialize(writeBuffer, object); return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex())); } } catch (Exception e) { throw buildSerializeError(e.getMessage(), e); //} finally { //fury.clearClassLoader(contextClassLoader); } } @Override public Object decode(final AbstractByteBuf data, final Class clazz, final Map<String, String> context) throws SofaRpcException { if (data.readableBytes() <= 0 || clazz == null) { throw buildDeserializeError("Deserialized array is empty."); } ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { fury.setClassLoader(contextClassLoader); CustomSerializer customSerializer = getCustomSerializer(clazz); if (customSerializer != null) { return customSerializer.decodeObject(data, context); } else { MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array()); return fury.deserialize(readBuffer); } } catch (Exception e) { throw buildDeserializeError(e.getMessage(), e); //} finally { //fury.clearClassLoader(contextClassLoader); } } @Override public void decode(final AbstractByteBuf data, final Object template, final Map<String, String> context) throws SofaRpcException { if (template == null) { throw buildDeserializeError("template is null!"); } ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { fury.setClassLoader(contextClassLoader); CustomSerializer customSerializer = getObjCustomSerializer(template); if (customSerializer != null) { customSerializer.decodeObjectByTemplate(data, context, template); } else { throw buildDeserializeError("Only support decode from SofaRequest and SofaResponse template"); } } catch (Exception e) { throw buildDeserializeError(e.getMessage(), e); //} finally { //fury.clearClassLoader(contextClassLoader); } } } ``` - src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.codec.Serializer ``` fury010=com.alipay.antvip.server.sofarpc.fury.Fury010Serializer ``` - testFury010SerializerReproduce ```java @Test public void testFury010SerializerReproduce() { System.setProperty("sofa.rpc.codec.serialize.checkMode", "DISABLE"); System.setProperty("FURY_CODE_DIR", ".code"); Fury010Serializer serializer = new Fury010Serializer(); SofaResponse sofaResponse = new SofaResponse(); VipDomain vipDomain = new VipDomain(); vipDomain.setName("obcrmcore-dev-pool.stable.alipay.net"); vipDomain.setVersion(38350); vipDomain.setProtectThreshold(30); vipDomain.setHealthCheckDefaultPort(12200); vipDomain.setHealthCheckPayload(Collections.emptyMap()); vipDomain.setHealthCheckEnable(false); vipDomain.addSpan("dbTs", 1747817528000L); vipDomain.addSpan("clientSendTs", 1747817529332L); vipDomain.addSpan("healthCheckInitTs", 1747817529306L); vipDomain.addSpan("action", "modify"); vipDomain.addSpan("webSyncRecvTs", 1747817529306L); vipDomain.addSpan("type", "domainChange"); RealNode realNode1 = new RealNode(); realNode1.setIp("11.122.210.60"); realNode1.setEnable(false); realNode1.setZone("gz00b"); realNode1.setAvailable(false); realNode1.setRoundTripTime(2000L); realNode1.setVipDomain(vipDomain); realNode1.setRnLabels(Collections.singletonMap("sofa_group", "GROUP_20221025160042")); RealNode realNode2 = new RealNode(); realNode2.setIp("6.0.124.11"); realNode2.setEnable(false); realNode2.setZone("gz00b"); realNode2.setAvailable(false); realNode2.setRoundTripTime(27L); realNode2.setVipDomain(vipDomain); vipDomain.setRealNodes(Arrays.asList(realNode1, realNode2)); PollingResponse response = new PollingResponse(); response.setStartTime(1747817529332L); response.setVipDomains(Collections.singletonList(vipDomain)); sofaResponse.setAppResponse(response); AbstractByteBuf encoded = serializer.encode(sofaResponse, null); Object decodeObj = serializer.decode(encoded, SofaResponse.class, null); System.out.println("SofaResponse decodeObj: " + decodeObj); } ``` ### What did you expect to see? Normal deserialization ### What did you see instead? deserialization exception ### Anything Else? ```java Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4 at org.apache.fury.collection.IntArray.pop(IntArray.java:54) at org.apache.fury.resolver.MapRefResolver.reference(MapRefResolver.java:192) at com.alipay.antvip.common.model.VipDomainFuryRefCodecMetaShared4_0.read(VipDomainFuryRefCodecMetaShared4_0.java:352) at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.readFields1$(PollingResponseFuryRefCodecMetaShared3_0.java:130) at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.read(PollingResponseFuryRefCodecMetaShared3_0.java:245) at com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0.read(SofaResponseFuryRefCodecMetaShared2_0.java:110) at org.apache.fury.Fury.readDataInternal(Fury.java:990) at org.apache.fury.Fury.readRef(Fury.java:874) at org.apache.fury.Fury.deserialize(Fury.java:806) ... 12 more ``` ```java Caused by: java.lang.ClassCastException: class com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0 cannot be cast to class org.apache.fury.serializer.collection.AbstractMapSerializer (com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0 and org.apache.fury.serializer.collection.AbstractMapSerializer are in module java.base of loader sun.misc.Launcher$AppClassLoader) at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.readFields2$(PollingResponseFuryRefCodecMetaShared3_0.java:197) at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.read(PollingResponseFuryRefCodecMetaShared3_0.java:243) at com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0.read(SofaResponseFuryRefCodecMetaShared2_0.java:107) at org.apache.fury.Fury.readDataInternal(Fury.java:990) at org.apache.fury.Fury.readRef(Fury.java:874) at org.apache.fury.Fury.deserialize(Fury.java:806) ... 12 more ``` ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org