This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch releases-0.10 in repository https://gitbox.apache.org/repos/asf/fury.git
commit 2c0fd397dba8f62f27714bf96633aadd36b8102d Author: Thomas Heigl <[email protected]> AuthorDate: Thu Dec 5 10:33:51 2024 +0100 feat(java): configurable buffer size limit (#1963) ## What does this PR do? This PR introduces a new configuration option `bufferSizeLimitBytes` that replaces the hard-coded default of 128kb. ## Related issues #1950 ## Does this PR introduce any user-facing change? The PR introduces a new configuration option `bufferSizeLimitBytes`. - [x] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Discussion This PR solves my problem, but I'm not sure if it is the right way to move forward. This is quite a low-level configuration option, but a potentially very important one. Every user whose average payload size is >=128kb, will need to increase this value for maximum performance. Maybe the default limit should be increased to something less conservative like 1MB, so fewer users will need to adjust this setting? --- .../src/main/java/org/apache/fury/Fury.java | 5 ++--- .../main/java/org/apache/fury/config/Config.java | 8 ++++++++ .../java/org/apache/fury/config/FuryBuilder.java | 12 ++++++++++++ .../src/test/java/org/apache/fury/FuryTest.java | 21 +++++++++++++++++++++ 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index e10e989a..b1918358 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -98,7 +98,6 @@ public final class Fury implements BaseFury { private static final byte isOutOfBandFlag = 1 << 3; private static final boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; private static final byte BITMAP = isLittleEndian ? isLittleEndianFlag : 0; - private static final int BUFFER_SIZE_LIMIT = 128 * 1024; private static final short MAGIC_NUMBER = 0x62D4; private final Config config; @@ -330,8 +329,8 @@ public final class Fury implements BaseFury { public void resetBuffer() { MemoryBuffer buf = buffer; - if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) { - buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT); + if (buf != null && buf.size() > config.bufferSizeLimitBytes()) { + buffer = MemoryBuffer.newHeapBuffer(config.bufferSizeLimitBytes()); } } diff --git a/java/fury-core/src/main/java/org/apache/fury/config/Config.java b/java/fury-core/src/main/java/org/apache/fury/config/Config.java index 98301413..bc6afe87 100644 --- a/java/fury-core/src/main/java/org/apache/fury/config/Config.java +++ b/java/fury-core/src/main/java/org/apache/fury/config/Config.java @@ -61,6 +61,7 @@ public class Config implements Serializable { private transient int configHash; private final boolean deserializeNonexistentEnumValueAsNull; private final boolean serializeEnumByName; + private final int bufferSizeLimitBytes; public Config(FuryBuilder builder) { name = builder.name; @@ -95,6 +96,7 @@ public class Config implements Serializable { scalaOptimizationEnabled = builder.scalaOptimizationEnabled; deserializeNonexistentEnumValueAsNull = builder.deserializeNonexistentEnumValueAsNull; serializeEnumByName = builder.serializeEnumByName; + bufferSizeLimitBytes = builder.bufferSizeLimitBytes; } /** Returns the name for Fury serialization. */ @@ -187,6 +189,10 @@ public class Config implements Serializable { return longEncoding; } + public int bufferSizeLimitBytes() { + return bufferSizeLimitBytes; + } + public boolean requireClassRegistration() { return requireClassRegistration; } @@ -283,6 +289,7 @@ public class Config implements Serializable { && compressString == config.compressString && compressInt == config.compressInt && compressLong == config.compressLong + && bufferSizeLimitBytes == config.bufferSizeLimitBytes && requireClassRegistration == config.requireClassRegistration && suppressClassRegistrationWarnings == config.suppressClassRegistrationWarnings && registerGuavaTypes == config.registerGuavaTypes @@ -317,6 +324,7 @@ public class Config implements Serializable { compressInt, compressLong, longEncoding, + bufferSizeLimitBytes, requireClassRegistration, suppressClassRegistrationWarnings, registerGuavaTypes, diff --git a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java index 8e183d66..06e954c5 100644 --- a/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java +++ b/java/fury-core/src/main/java/org/apache/fury/config/FuryBuilder.java @@ -83,6 +83,7 @@ public final class FuryBuilder { boolean suppressClassRegistrationWarnings = true; boolean deserializeNonexistentEnumValueAsNull = false; boolean serializeEnumByName = false; + int bufferSizeLimitBytes = 128 * 1024; MetaCompressor metaCompressor = new DeflaterMetaCompressor(); public FuryBuilder() {} @@ -184,6 +185,17 @@ public final class FuryBuilder { return this; } + /** + * Sets the limit for Fury's internal buffer. If the buffer size exceeds this limit, it will be + * reset to this limit after every serialization and deserialization. + * + * <p>The default is 128k. + */ + public FuryBuilder withBufferSizeLimitBytes(int bufferSizeLimitBytes) { + this.bufferSizeLimitBytes = bufferSizeLimitBytes; + return this; + } + /** * Set classloader for fury to load classes, this classloader can't up updated. Fury will cache * the class meta data, if classloader can be updated, there may be class meta collision if diff --git a/java/fury-core/src/test/java/org/apache/fury/FuryTest.java b/java/fury-core/src/test/java/org/apache/fury/FuryTest.java index 1e4b5a00..b5aaa0ff 100644 --- a/java/fury-core/src/test/java/org/apache/fury/FuryTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/FuryTest.java @@ -615,4 +615,25 @@ public class FuryTest extends FuryTestBase { Object obj = fury.deserializeJavaObjectAndClass(bytes); assertNull(obj); } + + @Test + public void testResetBufferToSizeLimit() { + final int minBufferBytes = 64; + final int limitInBytes = 1024; + Fury fury = Fury.builder().withBufferSizeLimitBytes(limitInBytes).build(); + + final byte[] smallPayload = new byte[0]; + final byte[] serializedSmall = fury.serialize(smallPayload); + assertEquals(fury.getBuffer().size(), minBufferBytes); + + fury.deserialize(serializedSmall); + assertEquals(fury.getBuffer().size(), minBufferBytes); + + final byte[] largePayload = new byte[limitInBytes * 2]; + final byte[] serializedLarge = fury.serialize(largePayload); + assertEquals(fury.getBuffer().size(), limitInBytes); + + fury.deserialize(serializedLarge); + assertEquals(fury.getBuffer().size(), limitInBytes); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
