Change all Helix default created ZkClients to use ZnRecordSerializer.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b549cda9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b549cda9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b549cda9 Branch: refs/heads/master Commit: b549cda95cb114da78efc4b0458058862bcc6d02 Parents: f9bc9f8 Author: Lei Xia <[email protected]> Authored: Wed Sep 19 16:34:46 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Thu Nov 1 14:38:31 2018 -0700 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 2 +- .../helix/manager/zk/ZNRecordSerializer.java | 17 +-- .../manager/zk/TestZNRecordSerializer.java | 151 +++++++++++++++++++ .../zk/TestZNRecordStreamingSerializer.java | 19 +++ 4 files changed, 177 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index c673f51..c4275df 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -594,7 +594,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { void createClient() throws Exception { PathBasedZkSerializer zkSerializer = - ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build(); + ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build(); HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress); connectionConfig.setSessionTimeout(_sessionTimeout); http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java index 95ebc06..0c92224 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java @@ -21,12 +21,8 @@ package org.apache.helix.manager.zk; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.StringWriter; import java.util.List; import java.util.Map; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.HelixException; @@ -40,6 +36,7 @@ import org.codehaus.jackson.map.SerializationConfig; public class ZNRecordSerializer implements ZkSerializer { private static Logger logger = LoggerFactory.getLogger(ZNRecordSerializer.class); + private final ObjectMapper _mapper = new ObjectMapper(); private static int getListFieldBound(ZNRecord record) { int max = Integer.MAX_VALUE; @@ -78,15 +75,14 @@ public class ZNRecordSerializer implements ZkSerializer { } // do serialization - ObjectMapper mapper = new ObjectMapper(); - SerializationConfig serializationConfig = mapper.getSerializationConfig(); + SerializationConfig serializationConfig = _mapper.getSerializationConfig(); serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] serializedBytes = null; + byte[] serializedBytes; try { - mapper.writeValue(baos, data); + _mapper.writeValue(baos, data); serializedBytes = baos.toByteArray(); // apply compression if needed if (record.getBooleanField("enableCompression", false) || serializedBytes.length > ZNRecord.SIZE_LIMIT) { @@ -113,10 +109,9 @@ public class ZNRecordSerializer implements ZkSerializer { return null; } - ObjectMapper mapper = new ObjectMapper(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - DeserializationConfig deserializationConfig = mapper.getDeserializationConfig(); + DeserializationConfig deserializationConfig = _mapper.getDeserializationConfig(); deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true); deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true); deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true); @@ -126,7 +121,7 @@ public class ZNRecordSerializer implements ZkSerializer { byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais); bais = new ByteArrayInputStream(uncompressedBytes); } - ZNRecord zn = mapper.readValue(bais, ZNRecord.class); + ZNRecord zn = _mapper.readValue(bais, ZNRecord.class); return zn; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java index 05df1cd..e46eb4d 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java @@ -2,9 +2,19 @@ package org.apache.helix.manager.zk; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.ZNRecord; import org.testng.Assert; import org.testng.annotations.Test; @@ -46,6 +56,147 @@ public class TestZNRecordSerializer { Assert.assertEquals(result, record); } + + @Test + public void testNullFields() { + ZNRecord record = new ZNRecord("testId"); + record.setMapField("K1", null); + record.setListField("k2", null); + record.setSimpleField("k3", null); + ZNRecordSerializer serializer = new ZNRecordSerializer(); + byte [] data = serializer.serialize(record); + ZNRecord result = (ZNRecord) serializer.deserialize(data); + + Assert.assertEquals(result, record); + Assert.assertNull(result.getMapField("K1")); + Assert.assertNull(result.getListField("K2")); + Assert.assertNull(result.getSimpleField("K3")); + Assert.assertNull(result.getListField("K4")); + } + + + @Test (enabled = false) + public void testPerformance() { + ZNRecord record = createZnRecord(); + + ZNRecordSerializer serializer1 = new ZNRecordSerializer(); + ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer(); + + int loop = 100000; + + long start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + serializer1.serialize(record); + } + System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis() - start) + " ms"); + + byte[] data = serializer1.serialize(record); + start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + serializer1.deserialize(data); + } + System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms"); + + + start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + data = serializer2.serialize(record); + } + System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis() - start) + " ms"); + + start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + ZNRecord result = (ZNRecord) serializer2.deserialize(data); + } + System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms"); + } + + + ZNRecord createZnRecord() { + ZNRecord record = new ZNRecord("testId"); + for (int i = 0; i < 400; i++) { + Map<String, String> map = new HashMap<>(); + map.put("localhost_" + i, "Master"); + map.put("localhost_" + (i+1), "Slave"); + map.put("localhost_" + (i+2), "Slave"); + + record.setMapField("partition_" + i, map); + record.setListField("partition_" + i, Lists.<String>newArrayList(map.keySet())); + record.setSimpleField("partition_" + i, UUID.randomUUID().toString()); + } + + return record; + } + + + @Test (enabled = false) + public void testParallelPerformance() throws ExecutionException, InterruptedException { + final ZNRecord record = createZnRecord(); + + final ZNRecordSerializer serializer1 = new ZNRecordSerializer(); + final ZNRecordStreamingSerializer serializer2 = new ZNRecordStreamingSerializer(); + + int loop = 100000; + + ExecutorService executorService = Executors.newFixedThreadPool(10000); + + long start = System.currentTimeMillis(); + batchSerialize(serializer1, executorService, loop, record); + System.out.println("ZNRecordSerializer serialize took " + (System.currentTimeMillis() - start) + " ms"); + + byte[] data = serializer1.serialize(record); + start = System.currentTimeMillis(); + batchSerialize(serializer2, executorService, loop, record); + System.out.println("ZNRecordSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms"); + + + start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + data = serializer2.serialize(record); + } + System.out.println("ZNRecordStreamingSerializer serialize took " + (System.currentTimeMillis() - start) + " ms"); + + start = System.currentTimeMillis(); + for (int i = 0; i < loop; i++) { + ZNRecord result = (ZNRecord) serializer2.deserialize(data); + } + System.out.println("ZNRecordStreamingSerializer deserialize took " + (System.currentTimeMillis() - start) + " ms"); + } + + + private void batchSerialize(final ZkSerializer serializer, ExecutorService executorService, int repeatTime, final ZNRecord record) + throws ExecutionException, InterruptedException { + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < repeatTime; i++) { + Future f = executorService.submit(new Runnable() { + @Override public void run() { + serializer.serialize(record); + } + }); + futures.add(f); + } + for (Future f : futures) { + f.get(); + } + } + + + private void batchDeSerialize(final ZkSerializer serializer, ExecutorService executorService, int repeatTime, final byte [] data) + throws ExecutionException, InterruptedException { + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < repeatTime; i++) { + Future f = executorService.submit(new Runnable() { + @Override public void run() { + serializer.deserialize(data); + } + }); + futures.add(f); + } + for (Future f : futures) { + f.get(); + } + } + /** * Test that simple, list, and map fields are initialized as empty even when not in json */ http://git-wip-us.apache.org/repos/asf/helix/blob/b549cda9/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java index 567d842..2aea3da 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java @@ -46,6 +46,25 @@ public class TestZNRecordStreamingSerializer { Assert.assertEquals(result, record); } + + // TODO: need to fix ZnRecordStreamingSerializer before enabling this test. + @Test (enabled = false) + public void testNullFields() { + ZNRecord record = new ZNRecord("testId"); + record.setMapField("K1", null); + record.setListField("k2", null); + record.setSimpleField("k3", null); + ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer(); + byte [] data = serializer.serialize(record); + ZNRecord result = (ZNRecord) serializer.deserialize(data); + + Assert.assertEquals(result, record); + Assert.assertNull(result.getMapField("K1")); + Assert.assertNull(result.getListField("K2")); + Assert.assertNull(result.getSimpleField("K3")); + Assert.assertNull(result.getListField("K4")); + } + /** * Check that the ZNRecord is not constructed if there is no id in the json */
