This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch helix-0.9.x in repository https://gitbox.apache.org/repos/asf/helix.git
commit 35cfaebe69bb5ffaed3365c191dce4a83245a747 Author: Jiajun Wang <[email protected]> AuthorDate: Thu May 7 10:08:10 2020 -0700 Async write operation should not throw Exception for serializing error (#845) (#999) This change will make the async write operations return error through the async callback instead of throwing exceptions. This change will fix the batch write/create failure due to one single node serializing failure. In addition, according to the serializer interface definition, change ZK related serializers to throw ZkMarshallingError instead of ZkClientException. --- .../helix/manager/zk/ZNRecordSerializer.java | 8 +- .../manager/zk/ZNRecordStreamingSerializer.java | 7 +- .../helix/manager/zk/zookeeper/ZkClient.java | 51 +++--- .../apache/helix/manager/zk/TestRawZkClient.java | 68 ++++++++ .../zk/TestZNRecordSerializeWriteSizeLimit.java | 3 +- .../helix/manager/zk/TestZNRecordSizeLimit.java | 182 +++++++++++---------- 6 files changed, 200 insertions(+), 119 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java index 416388b..1eb16b7 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java @@ -24,8 +24,8 @@ import java.io.ByteArrayOutputStream; import java.util.List; import java.util.Map; +import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.util.GZipCompressionUtil; import org.apache.helix.util.ZNRecordUtil; @@ -57,7 +57,7 @@ public class ZNRecordSerializer implements ZkSerializer { // null is NOT an instance of any class LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk"); - throw new HelixException("Input object is not of type ZNRecord (was " + data + ")"); + throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data + ")"); } ZNRecord record = (ZNRecord) data; @@ -96,7 +96,7 @@ public class ZNRecordSerializer implements ZkSerializer { LOG.error( "Exception during data serialization. ZNRecord ID: {} will not be written to zk.", record.getId(), e); - throw new HelixException(e); + throw new ZkMarshallingError(e); } int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit(); @@ -104,7 +104,7 @@ public class ZNRecordSerializer implements ZkSerializer { LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id: {}." + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit, isCompressed, record.getId()); - throw new HelixException( + throw new ZkMarshallingError( "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId()); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java index 0c0af09..efe9e64 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java @@ -29,7 +29,6 @@ import java.util.TreeMap; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.commons.codec.binary.Base64; -import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.util.GZipCompressionUtil; import org.apache.helix.util.ZNRecordUtil; @@ -64,7 +63,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { // null is NOT an instance of any class LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk"); - throw new HelixException("Input object is not of type ZNRecord (was " + data + ")"); + throw new ZkMarshallingError("Input object is not of type ZNRecord (was " + data + ")"); } // apply retention policy on list field @@ -165,7 +164,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { LOG.error( "Exception during data serialization. ZNRecord ID: {} will not be written to zk.", record.getId(), e); - throw new HelixException(e); + throw new ZkMarshallingError(e); } // check size int writeSizeLimit = ZNRecordUtil.getSerializerWriteSizeLimit(); @@ -173,7 +172,7 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { LOG.error("Data size: {} is greater than {} bytes, is compressed: {}, ZNRecord.id: {}." + " Data will not be written to Zookeeper.", serializedBytes.length, writeSizeLimit, isCompressed, record.getId()); - throw new HelixException( + throw new ZkMarshallingError( "Data size: " + serializedBytes.length + " is greater than " + writeSizeLimit + " bytes, is compressed: " + isCompressed + ", ZNRecord.id: " + record.getId()); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index 66f26d6..1b4d1ca 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -34,6 +34,7 @@ import org.I0Itec.zkclient.ZkLock; import org.I0Itec.zkclient.exception.ZkBadVersionException; import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.I0Itec.zkclient.exception.ZkTimeoutException; @@ -1400,17 +1401,22 @@ public class ZkClient implements Watcher { public void asyncCreate(final String path, Object datat, final CreateMode mode, final ZkAsyncCallbacks.CreateCallbackHandler cb) { final long startT = System.currentTimeMillis(); - final byte[] data = (datat == null ? null : serialize(datat, path)); - retryUntilConnected(new Callable<Object>() { - @Override - public Object call() throws Exception { - ((ZkConnection) getConnection()).getZookeeper().create(path, data, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - // Arrays.asList(DEFAULT_ACL), - mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, - data == null ? 0 : data.length, false)); - return null; - } + byte[] data = null; + try { + data = (datat == null ? null : serialize(datat, path)); + } catch (ZkMarshallingError e) { + cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null); + return; + } + final byte[] finalData = data; + retryUntilConnected(() -> { + ((ZkConnection) getConnection()).getZookeeper() + .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE, + // Arrays.asList(DEFAULT_ACL), + mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + finalData == null ? 0 : finalData.length, false)); + return null; }); } @@ -1418,15 +1424,20 @@ public class ZkClient implements Watcher { public void asyncSetData(final String path, Object datat, final int version, final ZkAsyncCallbacks.SetDataCallbackHandler cb) { final long startT = System.currentTimeMillis(); - final byte[] data = serialize(datat, path); - retryUntilConnected(new Callable<Object>() { - @Override - public Object call() throws Exception { - ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb, - new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, - data == null ? 0 : data.length, false)); - return null; - } + byte[] data = null; + try { + data = serialize(datat, path); + } catch (ZkMarshallingError e) { + cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null); + return; + } + final byte[] finalData = data; + retryUntilConnected(() -> { + ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb, + new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, + finalData == null ? 0 : finalData.length, false)); + return null; }); } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java index 95a55c4..ce109b7 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java @@ -22,6 +22,7 @@ package org.apache.helix.manager.zk; import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -32,7 +33,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkServer; +import org.apache.helix.SystemPropertyKeys; import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.manager.zk.zookeeper.ZkConnection; import org.apache.helix.monitoring.mbeans.MBeanRegistrar; @@ -40,6 +43,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.monitoring.mbeans.ZkClientMonitor; import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -344,4 +348,68 @@ public class TestRawZkClient extends ZkUnitTestBase { zkServer.shutdown(); } } + + @Test + public void testAsyncWriteOperations() { + ZkClient zkClient = new ZkClient(ZK_ADDR); + String originSizeLimit = + System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, "2000"); + try { + zkClient.setZkSerializer(new ZNRecordSerializer()); + + ZNRecord oversizeZNRecord = new ZNRecord("Oversize"); + StringBuilder sb = new StringBuilder(1204); + Random ran = new Random(); + for (int i = 0; i < 1024; i++) { + sb.append(ran.nextInt(26) + 'a'); + } + String buf = sb.toString(); + for (int i = 0; i < 1024; i++) { + oversizeZNRecord.setSimpleField(Integer.toString(i), buf); + } + + // ensure /tmp exists for the test + if (!zkClient.exists("/tmp")) { + zkClient.create("/tmp", null, CreateMode.PERSISTENT); + } + + ZkAsyncCallbacks.CreateCallbackHandler + createCallback = new ZkAsyncCallbacks.CreateCallbackHandler(); + zkClient.asyncCreate("/tmp/async", null, CreateMode.PERSISTENT, createCallback); + createCallback.waitForSuccess(); + Assert.assertEquals(createCallback.getRc(), 0); + Assert.assertTrue(zkClient.exists("/tmp/async")); + + // try to create oversize node, should fail + zkClient.asyncCreate("/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT, + createCallback); + createCallback.waitForSuccess(); + Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError); + Assert.assertFalse(zkClient.exists("/tmp/asyncOversize")); + + ZNRecord normalZNRecord = new ZNRecord("normal"); + normalZNRecord.setSimpleField("key", buf); + + ZkAsyncCallbacks.SetDataCallbackHandler + setDataCallbackHandler = new ZkAsyncCallbacks.SetDataCallbackHandler(); + zkClient.asyncSetData("/tmp/async", normalZNRecord, -1, setDataCallbackHandler); + setDataCallbackHandler.waitForSuccess(); + Assert.assertEquals(setDataCallbackHandler.getRc(), 0); + + zkClient.asyncSetData("/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler); + setDataCallbackHandler.waitForSuccess(); + Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError); + Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord); + } finally { + if (originSizeLimit == null) { + System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + } else { + System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, + originSizeLimit); + } + zkClient.delete("/tmp/async"); + zkClient.delete("/tmp/asyncOversize"); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java index 7dca363..671aba2 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializeWriteSizeLimit.java @@ -22,6 +22,7 @@ package org.apache.helix.manager.zk; import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.HelixException; import org.apache.helix.SystemPropertyKeys; @@ -146,7 +147,7 @@ public class TestZNRecordSerializeWriteSizeLimit { Assert.assertEquals(bytes.length >= limit, exceptionExpected); Assert.assertFalse(exceptionExpected); - } catch (HelixException ex) { + } catch (ZkMarshallingError ex) { Assert.assertTrue(exceptionExpected, "Should not throw ZkClientException."); Assert.assertTrue(ex.getMessage().contains(" is greater than " + limit + " bytes")); // No need to verify following asserts as bytes data is not returned. diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java index bc440b1..abb13b2 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSizeLimit.java @@ -21,6 +21,8 @@ package org.apache.helix.manager.zk; import java.util.Arrays; import java.util.Date; + +import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey.Builder; @@ -304,15 +306,16 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { final String thresholdProperty = System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); - ZNRecordSerializer serializer = new ZNRecordSerializer(); + try { + ZNRecordSerializer serializer = new ZNRecordSerializer(); - String root = getShortClassName(); + String root = getShortClassName(); - byte[] buf = new byte[1024]; - for (int i = 0; i < 1024; i++) { - buf[i] = 'a'; - } - String bufStr = new String(buf); + byte[] buf = new byte[1024]; + for (int i = 0; i < 1024; i++) { + buf[i] = 'a'; + } + String bufStr = new String(buf); // 1. legal-sized data gets written to zk // write a znode of size less than writeSizeLimit @@ -322,24 +325,24 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, String.valueOf(writeSizeLimit)); - final ZNRecord normalSizeRecord = new ZNRecord("normal-size"); - for (int i = 0; i < rawZnRecordSize; i++) { - normalSizeRecord.setSimpleField(Integer.toString(i), bufStr); - } + final ZNRecord normalSizeRecord = new ZNRecord("normal-size"); + for (int i = 0; i < rawZnRecordSize; i++) { + normalSizeRecord.setSimpleField(Integer.toString(i), bufStr); + } - String path = "/" + root + "/normal"; - _gZkClient.createPersistent(path, true); - _gZkClient.writeData(path, normalSizeRecord); + String path = "/" + root + "/normal"; + _gZkClient.createPersistent(path, true); + _gZkClient.writeData(path, normalSizeRecord); - ZNRecord record = _gZkClient.readData(path); + ZNRecord record = _gZkClient.readData(path); - // Successfully reads the same data. - Assert.assertEquals(normalSizeRecord, record); + // Successfully reads the same data. + Assert.assertEquals(normalSizeRecord, record); - int length = serializer.serialize(record).length; + int length = serializer.serialize(record).length; - // Less than writeSizeLimit so it is written to ZK. - Assert.assertTrue(length < writeSizeLimit); + // Less than writeSizeLimit so it is written to ZK. + Assert.assertTrue(length < writeSizeLimit); // 2. Large size data is not allowed to write to ZK // Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit. @@ -350,27 +353,27 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, String.valueOf(writeSizeLimit)); - final ZNRecord largeRecord = new ZNRecord("large-size"); - for (int i = 0; i < rawZnRecordSize; i++) { - largeRecord.setSimpleField(Integer.toString(i), bufStr); - } + final ZNRecord largeRecord = new ZNRecord("large-size"); + for (int i = 0; i < rawZnRecordSize; i++) { + largeRecord.setSimpleField(Integer.toString(i), bufStr); + } - path = "/" + root + "/large"; - _gZkClient.createPersistent(path, true); + path = "/" + root + "/large"; + _gZkClient.createPersistent(path, true); - try { - _gZkClient.writeData(path, largeRecord); - Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!"); - } catch (HelixException expected) { - Assert.assertTrue( - expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes")); - } + try { + _gZkClient.writeData(path, largeRecord); + Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!"); + } catch (ZkMarshallingError expected) { + Assert.assertTrue( + expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes")); + } - // test ZkDataAccessor - ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR); - admin.addCluster(root, true); - InstanceConfig instanceConfig = new InstanceConfig("localhost_12918"); - admin.addInstance(root, instanceConfig); + // test ZkDataAccessor + ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR); + admin.addCluster(root, true); + InstanceConfig instanceConfig = new InstanceConfig("localhost_12918"); + admin.addInstance(root, instanceConfig); // Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit. writeSizeLimitKb = 10; @@ -383,33 +386,32 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(_gZkClient)); Builder keyBuilder = accessor.keyBuilder(); - IdealState idealState = new IdealState("currentState"); - idealState.setStateModelDefRef("MasterSlave"); - idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); - idealState.setNumPartitions(10); + IdealState idealState = new IdealState("currentState"); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); + idealState.setNumPartitions(10); - for (int i = 0; i < 1024; i++) { - idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); - } - boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); - Assert.assertTrue(succeed); - HelixProperty property = accessor.getProperty( - keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1")); - Assert.assertNull(property); + for (int i = 0; i < 1024; i++) { + idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); + } + boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); + Assert.assertTrue(succeed); + HelixProperty property = accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1")); + Assert.assertNull(property); - // legal sized data gets written to zk - idealState.getRecord().getSimpleFields().clear(); - idealState.setStateModelDefRef("MasterSlave"); - idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); - idealState.setNumPartitions(10); + // legal sized data gets written to zk + idealState.getRecord().getSimpleFields().clear(); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); + idealState.setNumPartitions(10); - for (int i = 0; i < 900; i++) { - idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); - } - succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState); - Assert.assertTrue(succeed); - record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord(); - Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit); + for (int i = 0; i < 900; i++) { + idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); + } + succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState); + Assert.assertTrue(succeed); + record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord(); + Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit); // Set small write size limit so writing does not succeed. writeSizeLimitKb = 1; @@ -417,27 +419,28 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, String.valueOf(writeSizeLimit)); - // oversized data should not update existing data on zk - idealState.setStateModelDefRef("MasterSlave"); - idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); - idealState.setNumPartitions(10); - for (int i = 900; i < 1024; i++) { - idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); - } - - succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState); - Assert.assertFalse(succeed, - "Update property should not succeed because data exceeds znode write limit!"); + // oversized data should not update existing data on zk + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); + idealState.setNumPartitions(10); + for (int i = 900; i < 1024; i++) { + idealState.getRecord().setSimpleField(Integer.toString(i), bufStr); + } - // Delete the nodes. - deletePath(_gZkClient, "/" + root); + succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState); + Assert.assertFalse(succeed, + "Update property should not succeed because data exceeds znode write limit!"); - // Reset: add the properties back to system properties if they were originally available. - if (thresholdProperty != null) { - System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, - thresholdProperty); - } else { - System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + // Delete the nodes. + deletePath(_gZkClient, "/" + root); + } finally { + // Reset: add the properties back to system properties if they were originally available. + if (thresholdProperty != null) { + System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, + thresholdProperty); + } else { + System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + } } } @@ -515,7 +518,7 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { try { zkClient.writeData(path, largeRecord); Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!"); - } catch (HelixException expected) { + } catch (ZkMarshallingError expected) { Assert.assertTrue( expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes")); } @@ -587,14 +590,13 @@ public class TestZNRecordSizeLimit extends ZkUnitTestBase { deletePath(zkClient, "/" + root); } finally { zkClient.close(); - } - - // Reset: add the properties back to system properties if they were originally available. - if (thresholdProperty != null) { - System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, - thresholdProperty); - } else { - System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + // Reset: add the properties back to system properties if they were originally available. + if (thresholdProperty != null) { + System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, + thresholdProperty); + } else { + System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); + } } }
