This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-0.9.x by this push:
new 35e9886 Backport patch of Add separate ZK serializer configuration to
active ZNRecord compression when size exceeds a threshold. #1901
35e9886 is described below
commit 35e988626b104968f4fb71f637c92250c22c1045
Author: Junkai Xue <[email protected]>
AuthorDate: Wed Nov 10 14:41:01 2021 -0800
Backport patch of Add separate ZK serializer configuration to active
ZNRecord compression when size exceeds a threshold. #1901
---
.../java/org/apache/helix/SystemPropertyKeys.java | 9 ++
.../java/org/apache/helix/util/ZNRecordUtil.java | 20 +++-
.../zk/TestZNRecordSerializeWriteSizeLimit.java | 112 ++++++++++++---------
3 files changed, 94 insertions(+), 47 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index f967b71..e2ce8e0 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -49,6 +49,15 @@ public class SystemPropertyKeys {
public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
"zk.serializer.znrecord.write.size.limit.bytes";
+ /**
+ * This property defines a threshold of ZNRecord size in bytes that the ZK
serializer starts to auto compress
+ * the ZNRecord for write requests if it's size exceeds the threshold.
+ * If the threshold is not configured or exceed ZKRecord write size limit,
default value
+ * {@value ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES} will be applied.
+ */
+ public static final String
ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES =
+ "zk.serializer.znrecord.auto-compress.threshold.bytes";
+
public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
"helixmanager.participantHealthReport.reportLatency";
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
index b701847..c8f8e81 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -125,7 +125,25 @@ public final class ZNRecordUtil {
.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
ZNRecord.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT));
- return autoCompressEnabled && serializedLength >
getSerializerWriteSizeLimit();
+ return autoCompressEnabled && serializedLength >
getSerializerCompressThreshold();
+ }
+
+ /**
+ * Returns the threshold in bytes that ZNRecord serializer should compress a
ZNRecord with larger size.
+ * If threshold is configured to be less than or equal to 0, the serializer
will always compress ZNRecords as long as
+ * auto-compression is enabled.
+ * If threshold is not configured or the threshold is larger than ZNRecord
write size limit, the default value
+ * ZNRecord write size limit will be used instead.
+ */
+ private static int getSerializerCompressThreshold() {
+ Integer autoCompressThreshold =
+
Integer.getInteger(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+
+ if (autoCompressThreshold == null || autoCompressThreshold >
getSerializerWriteSizeLimit()) {
+ return getSerializerWriteSizeLimit();
+ }
+
+ return autoCompressThreshold;
}
/**
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
index 671aba2..b285f44 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
@@ -59,10 +59,10 @@ public class TestZNRecordSerializeWriteSizeLimit {
Assert.assertFalse(
Boolean.getBoolean(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
// Data size 300 KB > size limit 200 KB: exception expected.
- verifyAutoCompression(300, writeSizeLimit, true, false, true);
+ verifyAutoCompression(300, false, true);
// Data size 100 KB < size limit 200 KB: pass
- verifyAutoCompression(100, writeSizeLimit, false, false, false);
+ verifyAutoCompression(100, false, false);
// Reset: add the properties back to system properties if they were
originally available.
if (compressionEnabledProperty != null) {
@@ -80,83 +80,103 @@ public class TestZNRecordSerializeWriteSizeLimit {
}
/*
- * Tests data serializing when write size limit is set.
+ * Tests data serializing when compress threshold is set.
* Two cases:
- * 1. limit is not set
- * --> default size is used.
- * 2. limit is set
- * --> serialized data is checked by the limit: pass or throw
ZkClientException.
+ * 1. threshold is not set
+ * --> default threshold (write size limit is used): pass or throw exception.
+ * 2. threshold is set
+ * --> serialized data is checked by the threshold.
*/
@Test
public void testZNRecordSerializerWriteSizeLimit() {
// Backup properties for later resetting.
final String writeSizeLimitProperty =
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ final String compressThresholdProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
- // Unset write size limit property so default limit is used.
-
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
-
- Assert.assertNull(
-
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
-
- verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
-
- // 2. Set size limit so serialized data is greater than the size limit but
compressed data
- // is smaller than the size limit.
- // Set it to 2000 bytes
- int writeSizeLimit = 2000;
-
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- String.valueOf(writeSizeLimit));
-
- // Verify auto compression is done.
- verifyAutoCompression(200, writeSizeLimit, true, true, false);
-
- // 3. Set size limit to be be less than default value. The default value
will be used for write
- // size limit.
- writeSizeLimit = 2000;
-
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- String.valueOf(writeSizeLimit));
-
- // Verify ZkClientException is thrown because compressed data is larger
than size limit.
- verifyAutoCompression(1000, writeSizeLimit, true, true, true);
-
- // Reset: add the properties back to system properties if they were
originally available.
- if (writeSizeLimitProperty != null) {
-
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- writeSizeLimitProperty);
- } else {
+ try {
+ // Unset write size limit and compress threshold properties so default
limit is used.
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+ Assert.assertNull(
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
+ Assert.assertNull(
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES));
+ verifyAutoCompression(600, false, false);
+
+ // 1. Set size limit but no compression threshold.
+ final int writeSizeLimit = 200 * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+ // Data size 100 KB < size limit 200 KB: directly write without
compression.
+ verifyAutoCompression(100, false, false);
+ // Data size 300 KB > size limit 200 KB: compress expected.
+ verifyAutoCompression(300, true, false);
+
+ // 2. Set threshold so serialized compressed data even the size is
smaller than the size limit.
+ // Set it to 2000 bytes
+ final int threshold = 100 * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+ String.valueOf(threshold));
+ // Data size 150 KB < size limit 200 KB and > threshold 100 KB: compress
expected.
+ verifyAutoCompression(150, true, false);
+
+ // 3. Set threshold to be larger than the write size limit. The default
value will be used.
+ int doubleLimitThreshold = writeSizeLimit * 2;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+ String.valueOf(doubleLimitThreshold));
+ // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress
is still expected.
+ verifyAutoCompression(250, true, false);
+
+ // 4. Set threshold to zero so compression will always be done.
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+ String.valueOf(0));
+ // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress
is still expected.
+ verifyAutoCompression(5, true, false);
+ } finally {
+ // Reset: add the properties back to system properties if they were
originally available.
+ if (writeSizeLimitProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ writeSizeLimitProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
+ if (compressThresholdProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+ compressThresholdProperty);
+ } else {
+ System
+
.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+ }
}
}
- private void verifyAutoCompression(int recordSize, int limit, boolean
greaterThanThreshold,
- boolean compressionExpected, boolean exceptionExpected) {
+ private void verifyAutoCompression(int recordSize, boolean
compressionExpected, boolean exceptionExpected) {
ZNRecord record = createZNRecord(recordSize);
// Makes sure the length of serialized bytes is greater than limit to
// satisfy the condition: serialized bytes' length exceeds the limit.
byte[] preCompressedBytes = serialize(record);
- Assert.assertEquals(preCompressedBytes.length > limit,
greaterThanThreshold);
+ Assert.assertTrue(preCompressedBytes.length >= recordSize);
ZkSerializer zkSerializer = new ZNRecordSerializer();
byte[] bytes;
try {
bytes = zkSerializer.serialize(record);
-
- Assert.assertEquals(bytes.length >= limit, exceptionExpected);
Assert.assertFalse(exceptionExpected);
} catch (ZkMarshallingError ex) {
Assert.assertTrue(exceptionExpected, "Should not throw
ZkClientException.");
- Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit +
" bytes"));
// No need to verify following asserts as bytes data is not returned.
return;
}
// Verify whether serialized data is compressed or not.
Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes),
compressionExpected);
- Assert.assertEquals(preCompressedBytes.length != bytes.length,
compressionExpected);
+ Assert.assertEquals(preCompressedBytes.length > bytes.length,
compressionExpected,
+ "Compressed data should have a smaller size compared with the original
data.");
// Verify serialized bytes could correctly deserialize.
Assert.assertEquals(zkSerializer.deserialize(bytes), record);