This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-0.9.x by this push:
new 9335487 Add system property options to config write size limit for
ZNRecord Serializer (#809) (#998)
9335487 is described below
commit 9335487311571e819148de01d799271bf2ae632c
Author: Huizhi Lu <[email protected]>
AuthorDate: Wed May 6 15:02:25 2020 -0700
Add system property options to config write size limit for ZNRecord
Serializer (#809) (#998)
With default value 1 MB of ZNRecord size limit in ZNRecord serializers,
serialized data may still fail to be written to Zookeeper. This commit adds
system property options to config ZNRecord's write size limit and auto
compression enabled in ZNRecord serializers.
Signed-off-by: Huizhi Lu <[email protected]>
Signed-off-by: Huizhi Lu <[email protected]>
---
.../java/org/apache/helix/SystemPropertyKeys.java | 27 ++
.../src/main/java/org/apache/helix/ZNRecord.java | 11 +
.../helix/manager/zk/ZNRecordSerializer.java | 37 ++-
.../manager/zk/ZNRecordStreamingSerializer.java | 26 +-
.../java/org/apache/helix/util/ZNRecordUtil.java | 36 +++
.../zk/TestZNRecordSerializeWriteSizeLimit.java | 199 +++++++++++++
.../helix/manager/zk/TestZNRecordSizeLimit.java | 323 ++++++++++++++++++++-
7 files changed, 635 insertions(+), 24 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 1a6a797..f967b71 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -22,6 +22,33 @@ public class SystemPropertyKeys {
public static final String ZK_WAIT_CONNECTED_TIMEOUT =
"helixmanager.waitForConnectedTimeout";
+ /**
+ * Setting this property to true in system properties enables auto
compression in ZK serializer.
+ * The data will be automatically compressed by
+ * {@link org.apache.helix.util.GZipCompressionUtil} when being written to
Zookeeper
+ * if size of serialized data exceeds the write size limit, which by default
is 1 MB or could be
+ * set by {@value ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES}.
+ * <p>
+ * The default value is "true" (enabled).
+ */
+ public static final String ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED =
+ "zk.serializer.znrecord.auto-compress.enabled";
+
+ /**
+ * This is property that defines the maximum write size in bytes for
ZKRecord's two serializers
+ * before serialized data is ready to be written to ZK. This property
applies to
+ * 1. {@link org.apache.helix.manager.zk.ZNRecordSerializer}
+ * 2. {@link org.apache.helix.manager.zk.ZNRecordStreamingSerializer}.
+ * <p>
+ * If the size of serialized data (no matter whether it is compressed or
not) exceeds this
+ * configured limit, the data will NOT be written to Zookeeper.
+ * <p>
+ * Default value is 1 MB. If the configured limit is less than or equal to 0
byte,
+ * the default value will be used.
+ */
+ public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
+ "zk.serializer.znrecord.write.size.limit.bytes";
+
public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
"helixmanager.participantHealthReport.reportLatency";
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index bd9acbb..7a90e54 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -49,6 +49,17 @@ public class ZNRecord {
@JsonIgnore(true)
public static final String LIST_FIELD_BOUND = "listField.bound";
+ /** A field name in ZNRecord's simple fields to enable compression in
ZNRecord serializers. */
+ @JsonIgnore
+ public static final String ENABLE_COMPRESSION_BOOLEAN_FIELD =
"enableCompression";
+
+ /**
+ * Default value for system property
+ * {@link SystemPropertyKeys#ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED}
+ */
+ @JsonIgnore
+ public static final String ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT =
"true";
+
@JsonIgnore(true)
public static final int SIZE_LIMIT = 1000 * 1024; // leave a margin out of 1M
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 890bb13..416388b 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -28,6 +28,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.GZipCompressionUtil;
+import org.apache.helix.util.ZNRecordUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.codehaus.jackson.map.DeserializationConfig;
@@ -35,7 +36,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
public class ZNRecordSerializer implements ZkSerializer {
- private static Logger logger =
LoggerFactory.getLogger(ZNRecordSerializer.class);
+ private static Logger LOG =
LoggerFactory.getLogger(ZNRecordSerializer.class);
private static int getListFieldBound(ZNRecord record) {
int max = Integer.MAX_VALUE;
@@ -44,7 +45,7 @@ public class ZNRecordSerializer implements ZkSerializer {
try {
max = Integer.parseInt(maxStr);
} catch (Exception e) {
- logger.error("IllegalNumberFormat for list field bound: " + maxStr);
+ LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
}
}
return max;
@@ -54,7 +55,7 @@ public class ZNRecordSerializer implements ZkSerializer {
public byte[] serialize(Object data) {
if (!(data instanceof ZNRecord)) {
// null is NOT an instance of any class
- logger.error("Input object must be of type ZNRecord but it is " + data
+ LOG.error("Input object must be of type ZNRecord but it is " + data
+ ". Will not write to zk");
throw new HelixException("Input object is not of type ZNRecord (was " +
data + ")");
}
@@ -81,24 +82,33 @@ public class ZNRecordSerializer implements ZkSerializer {
serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS,
true);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] serializedBytes;
+ boolean isCompressed = false;
+
try {
mapper.writeValue(baos, data);
serializedBytes = baos.toByteArray();
// apply compression if needed
- if (record.getBooleanField("enableCompression", false) ||
serializedBytes.length > ZNRecord.SIZE_LIMIT) {
+ if (ZNRecordUtil.shouldCompress(record, serializedBytes.length)) {
serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+ isCompressed = true;
}
} catch (Exception e) {
- logger.error("Exception during data serialization. Will not write to zk.
Data (first 1k): "
- + new String(baos.toByteArray()).substring(0, 1024), e);
+ LOG.error(
+ "Exception during data serialization. ZNRecord ID: {} will not be
written to zk.",
+ record.getId(), e);
throw new HelixException(e);
}
- if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
- logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
- + ". Will not write to zk. Data (first 1k): "
- + new String(serializedBytes).substring(0, 1024));
- throw new HelixException("Data size larger than 1M, ZNRecord.id: " +
record.getId());
+
+ int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
+ if (serializedBytes.length > writeSizeLimit) {
+ LOG.error("Data size: {} is greater than {} bytes, is compressed: {},
ZNRecord.id: {}."
+ + " Data will not be written to Zookeeper.",
serializedBytes.length, writeSizeLimit,
+ isCompressed, record.getId());
+ throw new HelixException(
+ "Data size: " + serializedBytes.length + " is greater than " +
writeSizeLimit
+ + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " +
record.getId());
}
+
return serializedBytes;
}
@@ -122,11 +132,10 @@ public class ZNRecordSerializer implements ZkSerializer {
byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
bais = new ByteArrayInputStream(uncompressedBytes);
}
- ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
- return zn;
+ return mapper.readValue(bais, ZNRecord.class);
} catch (Exception e) {
- logger.error("Exception during deserialization of bytes: " + new
String(bytes), e);
+ LOG.error("Exception during deserialization of bytes: {}", new
String(bytes), e);
return null;
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index 39f307c..0c0af09 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -32,6 +32,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.util.GZipCompressionUtil;
+import org.apache.helix.util.ZNRecordUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.codehaus.jackson.JsonFactory;
@@ -79,7 +80,9 @@ public class ZNRecordStreamingSerializer implements
ZkSerializer {
}
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] serializedBytes = null;
+ byte[] serializedBytes;
+ boolean isCompressed = false;
+
try {
JsonFactory f = new JsonFactory();
JsonGenerator g = f.createJsonGenerator(baos);
@@ -154,20 +157,25 @@ public class ZNRecordStreamingSerializer implements
ZkSerializer {
g.close();
serializedBytes = baos.toByteArray();
// apply compression if needed
- if (record.getBooleanField("enableCompression", false) ||
serializedBytes.length > ZNRecord.SIZE_LIMIT) {
+ if (ZNRecordUtil.shouldCompress(record, serializedBytes.length)) {
serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+ isCompressed = true;
}
} catch (Exception e) {
- LOG.error("Exception during data serialization. Will not write to zk.
Data (first 1k): "
- + new String(baos.toByteArray()).substring(0, 1024), e);
+ LOG.error(
+ "Exception during data serialization. ZNRecord ID: {} will not be
written to zk.",
+ record.getId(), e);
throw new HelixException(e);
}
// check size
- if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
- LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
- + ". Will not write to zk. Data (first 1k): "
- + new String(serializedBytes).substring(0, 1024));
- throw new HelixException("Data size larger than 1M, ZNRecord.id: " +
record.getId());
+ int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit();
+ if (serializedBytes.length > writeSizeLimit) {
+ LOG.error("Data size: {} is greater than {} bytes, is compressed: {},
ZNRecord.id: {}."
+ + " Data will not be written to Zookeeper.",
serializedBytes.length, writeSizeLimit,
+ isCompressed, record.getId());
+ throw new HelixException(
+ "Data size: " + serializedBytes.length + " is greater than " +
writeSizeLimit
+ + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " +
record.getId());
}
return serializedBytes;
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
index 989b54a..b701847 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,4 +107,39 @@ public final class ZNRecordUtil {
}
return list;
}
+
+ /**
+ * Checks whether or not a serialized ZNRecord bytes should be compressed
before being written to
+ * Zookeeper.
+ *
+ * @param record raw ZNRecord before being serialized
+ * @param serializedLength length of the serialized bytes array
+ * @return
+ */
+ public static boolean shouldCompress(ZNRecord record, int serializedLength) {
+ if (record.getBooleanField(ZNRecord.ENABLE_COMPRESSION_BOOLEAN_FIELD,
false)) {
+ return true;
+ }
+
+ boolean autoCompressEnabled = Boolean.parseBoolean(System
+
.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
+ ZNRecord.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT));
+
+ return autoCompressEnabled && serializedLength >
getSerializerWriteSizeLimit();
+ }
+
+ /**
+ * Returns ZNRecord serializer write size limit in bytes. If size limit is
configured to be less
+ * than or equal to 0, the default value will be used instead.
+ */
+ public static int getSerializerWriteSizeLimit() {
+ Integer writeSizeLimit =
+
Integer.getInteger(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ if (writeSizeLimit == null || writeSizeLimit <= 0) {
+ return ZNRecord.SIZE_LIMIT;
+ }
+
+ return writeSizeLimit;
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
new file mode 100644
index 0000000..7dca363
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java
@@ -0,0 +1,199 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.util.GZipCompressionUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZNRecordSerializeWriteSizeLimit {
+ /*
+ * Tests data serializing when auto compression is disabled. If the system
property for
+ * auto compression is set to "false", auto compression is disabled.
+ */
+ @Test
+ public void testAutoCompressionDisabled() {
+ // Backup properties for later resetting.
+ final String compressionEnabledProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
+ final String compressionThresholdProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ // Prepare system properties to disable auto compression.
+ final int writeSizeLimit = 200 * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // 2. Set the auto compression enabled property to false so auto
compression is disabled.
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
"false");
+
+ // Verify auto compression is disabled.
+ Assert.assertFalse(
+
Boolean.getBoolean(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
+ // Data size 300 KB > size limit 200 KB: exception expected.
+ verifyAutoCompression(300, writeSizeLimit, true, false, true);
+
+ // Data size 100 KB < size limit 200 KB: pass
+ verifyAutoCompression(100, writeSizeLimit, false, false, false);
+
+ // Reset: add the properties back to system properties if they were
originally available.
+ if (compressionEnabledProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
+ compressionEnabledProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
+ }
+ if (compressionThresholdProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ compressionThresholdProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
+ }
+
+ /*
+ * Tests data serializing when write size limit is set.
+ * Two cases:
+ * 1. limit is not set
+ * --> default size is used.
+ * 2. limit is set
+ * --> serialized data is checked by the limit: pass or throw
ZkClientException.
+ */
+ @Test
+ public void testZNRecordSerializerWriteSizeLimit() {
+ // Backup properties for later resetting.
+ final String writeSizeLimitProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ // Unset write size limit property so default limit is used.
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ Assert.assertNull(
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
+
+ verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
+
+ // 2. Set size limit so serialized data is greater than the size limit but
compressed data
+ // is smaller than the size limit.
+ // Set it to 2000 bytes
+ int writeSizeLimit = 2000;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // Verify auto compression is done.
+ verifyAutoCompression(200, writeSizeLimit, true, true, false);
+
+ // 3. Set size limit to be be less than default value. The default value
will be used for write
+ // size limit.
+ writeSizeLimit = 2000;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // Verify ZkClientException is thrown because compressed data is larger
than size limit.
+ verifyAutoCompression(1000, writeSizeLimit, true, true, true);
+
+ // Reset: add the properties back to system properties if they were
originally available.
+ if (writeSizeLimitProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ writeSizeLimitProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
+ }
+
+ private void verifyAutoCompression(int recordSize, int limit, boolean
greaterThanThreshold,
+ boolean compressionExpected, boolean exceptionExpected) {
+ ZNRecord record = createZNRecord(recordSize);
+
+ // Makes sure the length of serialized bytes is greater than limit to
+ // satisfy the condition: serialized bytes' length exceeds the limit.
+ byte[] preCompressedBytes = serialize(record);
+
+ Assert.assertEquals(preCompressedBytes.length > limit,
greaterThanThreshold);
+
+ ZkSerializer zkSerializer = new ZNRecordSerializer();
+
+ byte[] bytes;
+ try {
+ bytes = zkSerializer.serialize(record);
+
+ Assert.assertEquals(bytes.length >= limit, exceptionExpected);
+ Assert.assertFalse(exceptionExpected);
+ } catch (HelixException ex) {
+ Assert.assertTrue(exceptionExpected, "Should not throw
ZkClientException.");
+ Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit +
" bytes"));
+ // No need to verify following asserts as bytes data is not returned.
+ return;
+ }
+
+ // Verify whether serialized data is compressed or not.
+ Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes),
compressionExpected);
+ Assert.assertEquals(preCompressedBytes.length != bytes.length,
compressionExpected);
+
+ // Verify serialized bytes could correctly deserialize.
+ Assert.assertEquals(zkSerializer.deserialize(bytes), record);
+ }
+
+ private ZNRecord createZNRecord(final int recordSizeKb) {
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++) {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
+
+ ZNRecord record = new ZNRecord("record");
+ for (int i = 0; i < recordSizeKb; i++) {
+ record.setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ return record;
+ }
+
+ // Simulates serializing so we can check the size of serialized bytes.
+ // Returns raw serialized bytes before being compressed.
+ private byte[] serialize(Object data) {
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+ serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS,
true);
+
serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS,
true);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] serializedBytes = new byte[0];
+
+ try {
+ mapper.writeValue(baos, data);
+ serializedBytes = baos.toByteArray();
+ } catch (IOException e) {
+ Assert.fail("Can not serialize data.", e);
+ }
+
+ return serializedBytes;
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
index bccb425..bc440b1 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java
@@ -24,6 +24,8 @@ import java.util.Date;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.manager.zk.client.HelixZkClient;
@@ -159,7 +161,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase {
+ new Date(System.currentTimeMillis()));
}
- @Test
+ @Test(dependsOnMethods = "testZNRecordSizeLimitUseZNRecordSerializer")
public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
String className = getShortClassName();
System.out.println("START
testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
@@ -289,4 +291,323 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase
{
System.out.println("END
testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
System.currentTimeMillis()));
}
+
+ /*
+ * Tests ZNRecordSerializer threshold.
+ * Two cases using ZkClient and ZkDataAccessor:
+ * 1. serialized data size is less than threshold and could be written to ZK.
+ * 2. serialized data size is greater than threshold, so ZkClientException
is thrown.
+ */
+ @Test(dependsOnMethods =
"testZNRecordSizeLimitUseZNRecordStreamingSerializer")
+ public void testZNRecordSerializerWriteSizeLimit() throws Exception {
+ // Backup properties for later resetting.
+ final String thresholdProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ ZNRecordSerializer serializer = new ZNRecordSerializer();
+
+ String root = getShortClassName();
+
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++) {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
+
+ // 1. legal-sized data gets written to zk
+ // write a znode of size less than writeSizeLimit
+ int rawZnRecordSize = 700;
+ int writeSizeLimitKb = 800;
+ int writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ String path = "/" + root + "/normal";
+ _gZkClient.createPersistent(path, true);
+ _gZkClient.writeData(path, normalSizeRecord);
+
+ ZNRecord record = _gZkClient.readData(path);
+
+ // Successfully reads the same data.
+ Assert.assertEquals(normalSizeRecord, record);
+
+ int length = serializer.serialize(record).length;
+
+ // Less than writeSizeLimit so it is written to ZK.
+ Assert.assertTrue(length < writeSizeLimit);
+
+ // 2. Large size data is not allowed to write to ZK
+ // Set raw record size to be large enough so its serialized data exceeds
the writeSizeLimit.
+ rawZnRecordSize = 2000;
+ // Set the writeSizeLimit to very small so serialized data size exceeds
the writeSizeLimit.
+ writeSizeLimitKb = 1;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ final ZNRecord largeRecord = new ZNRecord("large-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ largeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ path = "/" + root + "/large";
+ _gZkClient.createPersistent(path, true);
+
+ try {
+ _gZkClient.writeData(path, largeRecord);
+ Assert.fail("Data should not be written to ZK because data size exceeds
writeSizeLimit!");
+ } catch (HelixException expected) {
+ Assert.assertTrue(
+ expected.getMessage().contains(" is greater than " + writeSizeLimit
+ " bytes"));
+ }
+
+ // test ZkDataAccessor
+ ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+ admin.addCluster(root, true);
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+ admin.addInstance(root, instanceConfig);
+
+ // Set the writeSizeLimit to 10KB so serialized data size does not exceed
writeSizeLimit.
+ writeSizeLimitKb = 10;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // oversized data should not create any new data on zk
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState = new IdealState("currentState");
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"),
idealState);
+ Assert.assertTrue(succeed);
+ HelixProperty property = accessor.getProperty(
+ keyBuilder.stateTransitionStatus("localhost_12918", "session_1",
"partition_1"));
+ Assert.assertNull(property);
+
+ // legal sized data gets written to zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 900; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"),
idealState);
+ Assert.assertTrue(succeed);
+ record =
accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+ Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+
+ // Set small write size limit so writing does not succeed.
+ writeSizeLimitKb = 1;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // oversized data should not update existing data on zk
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+ for (int i = 900; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"),
idealState);
+ Assert.assertFalse(succeed,
+ "Update property should not succeed because data exceeds znode write
limit!");
+
+ // Delete the nodes.
+ deletePath(_gZkClient, "/" + root);
+
+ // Reset: add the properties back to system properties if they were
originally available.
+ if (thresholdProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ thresholdProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
+ }
+
+ /*
+ * Tests ZNRecordStreamingSerializer threshold.
+ * Two cases using ZkClient and ZkDataAccessor:
+ * 1. serialized data size is less than threshold and could be written to ZK.
+ * 2. serialized data size is greater than threshold, so ZkClientException
is thrown.
+ */
+ @Test(dependsOnMethods = "testZNRecordSerializerWriteSizeLimit")
+ public void testZNRecordStreamingSerializerWriteSizeLimit() throws Exception
{
+ // Backup properties for later resetting.
+ final String thresholdProperty =
+
System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+
+ ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+
+ try {
+ zkClient.setZkSerializer(serializer);
+
+ String root = getShortClassName();
+
+ byte[] buf = new byte[1024];
+ for (int i = 0; i < 1024; i++) {
+ buf[i] = 'a';
+ }
+ String bufStr = new String(buf);
+
+ // 1. legal-sized data gets written to zk
+ // write a znode of size less than writeSizeLimit
+ int rawZnRecordSize = 700;
+ int writeSizeLimitKb = 800;
+ int writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ String path = "/" + root + "/normal";
+ zkClient.createPersistent(path, true);
+ zkClient.writeData(path, normalSizeRecord);
+
+ ZNRecord record = zkClient.readData(path);
+
+ // Successfully reads the same data.
+ Assert.assertEquals(normalSizeRecord, record);
+
+ int length = serializer.serialize(record).length;
+
+ // Less than writeSizeLimit so it is written to ZK.
+ Assert.assertTrue(length < writeSizeLimit);
+
+ // 2. Large size data is not allowed to write to ZK
+ // Set raw record size to be large enough so its serialized data exceeds
the writeSizeLimit.
+ rawZnRecordSize = 2000;
+ // Set the writeSizeLimit to very small so serialized data size exceeds
the writeSizeLimit.
+ writeSizeLimitKb = 1;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ final ZNRecord largeRecord = new ZNRecord("large-size");
+ for (int i = 0; i < rawZnRecordSize; i++) {
+ largeRecord.setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ path = "/" + root + "/large";
+ zkClient.createPersistent(path, true);
+
+ try {
+ zkClient.writeData(path, largeRecord);
+ Assert.fail("Data should not written to ZK because data size exceeds
writeSizeLimit!");
+ } catch (HelixException expected) {
+ Assert.assertTrue(
+ expected.getMessage().contains(" is greater than " +
writeSizeLimit + " bytes"));
+ }
+
+ // test ZkDataAccessor
+ ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+ admin.addCluster(root, true);
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
+ admin.addInstance(root, instanceConfig);
+
+ // Set the writeSizeLimit to 10KB so serialized data size does not
exceed writeSizeLimit.
+ writeSizeLimitKb = 10;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // oversize data should not create any new data on zk
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(_gZkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState = new IdealState("currentState");
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ boolean succeed =
accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+ Assert.assertTrue(succeed);
+ HelixProperty property = accessor.getProperty(
+ keyBuilder.stateTransitionStatus("localhost_12918", "session_1",
"partition_1"));
+ Assert.assertNull(property);
+
+ // legal sized data gets written to zk
+ idealState.getRecord().getSimpleFields().clear();
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+
+ for (int i = 0; i < 900; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+ succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"),
idealState);
+ Assert.assertTrue(succeed);
+ record =
accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
+ Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
+
+ // Set small write size limit so writing does not succeed.
+ writeSizeLimitKb = 1;
+ writeSizeLimit = writeSizeLimitKb * 1024;
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(writeSizeLimit));
+
+ // oversize data should not update existing data on zk
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(10);
+ for (int i = 900; i < 1024; i++) {
+ idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
+ }
+
+ succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"),
idealState);
+ Assert.assertFalse(succeed,
+ "Update property should not succeed because data exceeds znode write
limit!");
+
+ // Delete the nodes.
+ deletePath(zkClient, "/" + root);
+ } finally {
+ zkClient.close();
+ }
+
+ // Reset: add the properties back to system properties if they were
originally available.
+ if (thresholdProperty != null) {
+
System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ thresholdProperty);
+ } else {
+
System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ }
+ }
+
+ private void deletePath(final HelixZkClient zkClient, final String path)
throws Exception {
+ Assert.assertTrue(TestHelper.verify(() -> {
+ do {
+ try {
+ zkClient.deleteRecursively(path);
+ } catch (HelixException ex) {
+ // ignore
+ }
+ } while (zkClient.exists(path));
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
}