This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-0.9.x by this push:
     new 35e9886  Backport patch of Add separate ZK serializer configuration to 
active ZNRecord compression when size exceeds a threshold. #1901
35e9886 is described below

commit 35e988626b104968f4fb71f637c92250c22c1045
Author: Junkai Xue <[email protected]>
AuthorDate: Wed Nov 10 14:41:01 2021 -0800

    Backport patch of Add separate ZK serializer configuration to active 
ZNRecord compression when size exceeds a threshold. #1901
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   9 ++
 .../java/org/apache/helix/util/ZNRecordUtil.java   |  20 +++-
 .../zk/TestZNRecordSerializeWriteSizeLimit.java    | 112 ++++++++++++---------
 3 files changed, 94 insertions(+), 47 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java 
b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index f967b71..e2ce8e0 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -49,6 +49,15 @@ public class SystemPropertyKeys {
   public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
       "zk.serializer.znrecord.write.size.limit.bytes";
 
+  /**
+   * This property defines a threshold of ZNRecord size in bytes that the ZK 
serializer starts to auto compress
+   * the ZNRecord for write requests if it's size exceeds the threshold.
+   * If the threshold is not configured or exceed ZKRecord write size limit, 
default value
+   * {@value ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES} will be applied.
+   */
+  public static final String 
ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES =
+      "zk.serializer.znrecord.auto-compress.threshold.bytes";
+
   public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
       "helixmanager.participantHealthReport.reportLatency";
 
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
index b701847..c8f8e81 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -125,7 +125,25 @@ public final class ZNRecordUtil {
         
.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
             ZNRecord.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT));
 
-    return autoCompressEnabled && serializedLength > 
getSerializerWriteSizeLimit();
+    return autoCompressEnabled && serializedLength > 
getSerializerCompressThreshold();
+  }
+
+  /**
+   * Returns the threshold in bytes that ZNRecord serializer should compress a 
ZNRecord with larger size.
+   * If threshold is configured to be less than or equal to 0, the serializer 
will always compress ZNRecords as long as
+   * auto-compression is enabled.
+   * If threshold is not configured or the threshold is larger than ZNRecord 
write size limit, the default value
+   * ZNRecord write size limit will be used instead.
+   */
+  private static int getSerializerCompressThreshold() {
+    Integer autoCompressThreshold =
+        
Integer.getInteger(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+
+    if (autoCompressThreshold == null || autoCompressThreshold > 
getSerializerWriteSizeLimit()) {
+      return getSerializerWriteSizeLimit();
+    }
+
+    return autoCompressThreshold;
   }
 
   /**
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
index 671aba2..b285f44 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
@@ -59,10 +59,10 @@ public class TestZNRecordSerializeWriteSizeLimit {
     Assert.assertFalse(
         
Boolean.getBoolean(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
     // Data size 300 KB > size limit 200 KB: exception expected.
-    verifyAutoCompression(300, writeSizeLimit, true, false, true);
+    verifyAutoCompression(300, false, true);
 
     // Data size 100 KB < size limit 200 KB: pass
-    verifyAutoCompression(100, writeSizeLimit, false, false, false);
+    verifyAutoCompression(100, false, false);
 
     // Reset: add the properties back to system properties if they were 
originally available.
     if (compressionEnabledProperty != null) {
@@ -80,83 +80,103 @@ public class TestZNRecordSerializeWriteSizeLimit {
   }
 
   /*
-   * Tests data serializing when write size limit is set.
+   * Tests data serializing when compress threshold is set.
    * Two cases:
-   * 1. limit is not set
-   * --> default size is used.
-   * 2. limit is set
-   * --> serialized data is checked by the limit: pass or throw 
ZkClientException.
+   * 1. threshold is not set
+   * --> default threshold (write size limit is used): pass or throw exception.
+   * 2. threshold is set
+   * --> serialized data is checked by the threshold.
    */
   @Test
   public void testZNRecordSerializerWriteSizeLimit() {
     // Backup properties for later resetting.
     final String writeSizeLimitProperty =
         
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    final String compressThresholdProperty =
+        
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
 
-    // Unset write size limit property so default limit is used.
-    
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
-
-    Assert.assertNull(
-        
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
-
-    verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
-
-    // 2. Set size limit so serialized data is greater than the size limit but 
compressed data
-    // is smaller than the size limit.
-    // Set it to 2000 bytes
-    int writeSizeLimit = 2000;
-    
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
-
-    // Verify auto compression is done.
-    verifyAutoCompression(200, writeSizeLimit, true, true, false);
-
-    // 3. Set size limit to be be less than default value. The default value 
will be used for write
-    // size limit.
-    writeSizeLimit = 2000;
-    
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-        String.valueOf(writeSizeLimit));
-
-    // Verify ZkClientException is thrown because compressed data is larger 
than size limit.
-    verifyAutoCompression(1000, writeSizeLimit, true, true, true);
-
-    // Reset: add the properties back to system properties if they were 
originally available.
-    if (writeSizeLimitProperty != null) {
-      
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          writeSizeLimitProperty);
-    } else {
+    try {
+      // Unset write size limit and compress threshold properties so default 
limit is used.
       
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+      Assert.assertNull(
+          
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
+      Assert.assertNull(
+          
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES));
+      verifyAutoCompression(600, false, false);
+
+      // 1. Set size limit but no compression threshold.
+      final int writeSizeLimit = 200 * 1024;
+      
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+          String.valueOf(writeSizeLimit));
+      // Data size 100 KB < size limit 200 KB: directly write without 
compression.
+      verifyAutoCompression(100, false, false);
+      // Data size 300 KB > size limit 200 KB: compress expected.
+      verifyAutoCompression(300, true, false);
+
+      // 2. Set threshold so serialized compressed data even the size is 
smaller than the size limit.
+      // Set it to 2000 bytes
+      final int threshold = 100 * 1024;
+      
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(threshold));
+      // Data size 150 KB < size limit 200 KB and > threshold 100 KB: compress 
expected.
+      verifyAutoCompression(150, true, false);
+
+      // 3. Set threshold to be larger than the write size limit. The default 
value will be used.
+      int doubleLimitThreshold = writeSizeLimit * 2;
+      
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(doubleLimitThreshold));
+      // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress 
is still expected.
+      verifyAutoCompression(250, true, false);
+
+      // 4. Set threshold to zero so compression will always be done.
+      
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(0));
+      // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress 
is still expected.
+      verifyAutoCompression(5, true, false);
+    } finally {
+      // Reset: add the properties back to system properties if they were 
originally available.
+      if (writeSizeLimitProperty != null) {
+        
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+            writeSizeLimitProperty);
+      } else {
+        
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+      }
+      if (compressThresholdProperty != null) {
+        
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+            compressThresholdProperty);
+      } else {
+        System
+            
.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+      }
     }
   }
 
-  private void verifyAutoCompression(int recordSize, int limit, boolean 
greaterThanThreshold,
-      boolean compressionExpected, boolean exceptionExpected) {
+  private void verifyAutoCompression(int recordSize, boolean 
compressionExpected, boolean exceptionExpected) {
     ZNRecord record = createZNRecord(recordSize);
 
     // Makes sure the length of serialized bytes is greater than limit to
     // satisfy the condition: serialized bytes' length exceeds the limit.
     byte[] preCompressedBytes = serialize(record);
 
-    Assert.assertEquals(preCompressedBytes.length > limit, 
greaterThanThreshold);
+    Assert.assertTrue(preCompressedBytes.length >= recordSize);
 
     ZkSerializer zkSerializer = new ZNRecordSerializer();
 
     byte[] bytes;
     try {
       bytes = zkSerializer.serialize(record);
-
-      Assert.assertEquals(bytes.length >= limit, exceptionExpected);
       Assert.assertFalse(exceptionExpected);
     } catch (ZkMarshallingError ex) {
       Assert.assertTrue(exceptionExpected, "Should not throw 
ZkClientException.");
-      Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit + 
" bytes"));
       // No need to verify following asserts as bytes data is not returned.
       return;
     }
 
     // Verify whether serialized data is compressed or not.
     Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes), 
compressionExpected);
-    Assert.assertEquals(preCompressedBytes.length != bytes.length, 
compressionExpected);
+    Assert.assertEquals(preCompressedBytes.length > bytes.length, 
compressionExpected,
+        "Compressed data should have a smaller size compared with the original 
data.");
 
     // Verify serialized bytes could correctly deserialize.
     Assert.assertEquals(zkSerializer.deserialize(bytes), record);

Reply via email to