Repository: helix Updated Branches: refs/heads/helix-0.6.x 99baacf7f -> 823a6cac8
[HELIX-573] Add support to automatically compress/uncompress data in Zookeeper Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/823a6cac Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/823a6cac Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/823a6cac Branch: refs/heads/helix-0.6.x Commit: 823a6cac80d62786e02992d75391a5e8e205a288 Parents: 99baacf Author: Kishore Gopalakrishna <[email protected]> Authored: Wed Mar 11 08:46:26 2015 -0700 Committer: Kishore Gopalakrishna <[email protected]> Committed: Wed Mar 11 09:55:13 2015 -0700 ---------------------------------------------------------------------- .../stages/ExternalViewComputeStage.java | 4 + .../helix/manager/zk/ZNRecordSerializer.java | 31 +++- .../manager/zk/ZNRecordStreamingSerializer.java | 30 +++- .../apache/helix/util/GZipCompressionUtil.java | 73 +++++++++ .../integration/TestEnableCompression.java | 159 +++++++++++++++++++ .../manager/zk/TestZNRecordSerializer.java | 127 +++++++++++++++ .../zk/TestZNRecordStreamingSerializer.java | 60 +++++++ 7 files changed, 469 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 903840c..80529df 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -123,6 +123,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage { // compare the new external view with current one, set only on different ExternalView curExtView = curExtViews.get(resourceName); + if (curExtView == null || !curExtView.getRecord().equals(view.getRecord())) { keys.add(keyBuilder.externalView(resourceName)); newExtViews.add(view); @@ -137,6 +138,9 @@ public class ExternalViewComputeStage extends AbstractBaseStage { DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { updateScheduledTaskStatus(view, manager, idealState); } + if (idealState != null) { + view.getRecord().getSimpleFields().putAll(idealState.getRecord().getSimpleFields()); + } } } // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all. http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java index 4419fdd..f8d3160 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java @@ -20,13 +20,18 @@ package org.apache.helix.manager.zk; */ import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.StringWriter; import java.util.List; import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; +import org.apache.helix.util.GZipCompressionUtil; import org.apache.log4j.Logger; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; @@ -77,21 +82,27 @@ public class ZNRecordSerializer implements ZkSerializer { serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true); serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); - StringWriter sw = new StringWriter(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] serializedBytes = null; try { - mapper.writeValue(sw, data); + mapper.writeValue(baos, data); + serializedBytes = baos.toByteArray(); + // apply compression if needed + if (record.getBooleanField("enableCompression", false)) { + serializedBytes = GZipCompressionUtil.compress(serializedBytes); + } } catch (Exception e) { logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): " - + sw.toString().substring(0, 1024), e); + + new String(baos.toByteArray()).substring(0, 1024), e); throw new HelixException(e); } - - if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT) { + if (serializedBytes.length > ZNRecord.SIZE_LIMIT) { logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId() - + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024)); + + ". Will not write to zk. Data (first 1k): " + + new String(serializedBytes).substring(0, 1024)); throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId()); } - return sw.toString().getBytes(); + return serializedBytes; } @Override @@ -109,7 +120,13 @@ public class ZNRecordSerializer implements ZkSerializer { deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true); deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true); try { + //decompress the data if its already compressed + if (GZipCompressionUtil.isCompressed(bytes)) { + byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais); + bais = new ByteArrayInputStream(uncompressedBytes); + } ZNRecord zn = mapper.readValue(bais, ZNRecord.class); + return zn; } catch (Exception e) { logger.error("Exception during deserialization of bytes: " + new String(bytes), e); http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java index 2d7cb3c..53db50a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java @@ -20,17 +20,21 @@ package org.apache.helix.manager.zk; */ import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.commons.codec.binary.Base64; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; +import org.apache.helix.util.GZipCompressionUtil; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; @@ -76,11 +80,11 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { } } } - - StringWriter sw = new StringWriter(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] serializedBytes = null; try { JsonFactory f = new JsonFactory(); - JsonGenerator g = f.createJsonGenerator(sw); + JsonGenerator g = f.createJsonGenerator(baos); g.writeStartObject(); @@ -150,20 +154,25 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { // important: will force flushing of output, close underlying output // stream g.close(); + serializedBytes = baos.toByteArray(); + // apply compression if needed + if (record.getBooleanField("enableCompression", false)) { + serializedBytes = GZipCompressionUtil.compress(serializedBytes); + } } catch (Exception e) { LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): " - + sw.toString().substring(0, 1024), e); + + new String(baos.toByteArray()).substring(0, 1024), e); throw new HelixException(e); } - // check size - if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT) { + if (serializedBytes.length > ZNRecord.SIZE_LIMIT) { LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId() - + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024)); + + ". Will not write to zk. Data (first 1k): " + + new String(serializedBytes).substring(0, 1024)); throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId()); } - return sw.toString().getBytes(); + return serializedBytes; } @Override @@ -183,6 +192,11 @@ public class ZNRecordStreamingSerializer implements ZkSerializer { byte[] rawPayload = null; try { + // decompress the data if its already compressed + if (GZipCompressionUtil.isCompressed(bytes)) { + byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais); + bais = new ByteArrayInputStream(uncompressedBytes); + } JsonFactory f = new JsonFactory(); JsonParser jp = f.createJsonParser(bais); http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java b/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java new file mode 100644 index 0000000..f29a301 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java @@ -0,0 +1,73 @@ +package org.apache.helix.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZipCompressionUtil { + /** + * Compresses a byte array by applying GZIP compression + * @param serializedBytes + * @return + * @throws IOException + */ + public static byte[] compress(byte[] buffer) throws IOException { + ByteArrayOutputStream gzipByteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = null; + gzipOutputStream = new GZIPOutputStream(gzipByteArrayOutputStream); + gzipOutputStream.write(buffer, 0, buffer.length); + gzipOutputStream.close(); + byte[] compressedBytes = gzipByteArrayOutputStream.toByteArray(); + return compressedBytes; + } + + public static byte[] uncompress(ByteArrayInputStream bais) throws IOException { + GZIPInputStream gzipInputStream = new GZIPInputStream(bais); + byte[] buffer = new byte[1024]; + int length; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + while ((length = gzipInputStream.read(buffer)) != -1) { + baos.write(buffer, 0, length); + } + gzipInputStream.close(); + baos.close(); + byte[] uncompressedBytes = baos.toByteArray(); + return uncompressedBytes; + } + + /* + * Determines if a byte array is compressed. The java.util.zip GZip + * implementaiton does not expose the GZip header so it is difficult to determine + * if a string is compressed. + * @param bytes an array of bytes + * @return true if the array is compressed or false otherwise + */ + public static boolean isCompressed(byte[] bytes) { + if ((bytes == null) || (bytes.length < 2)) { + return false; + } else { + return ((bytes[0] == (byte) (GZIPInputStream.GZIP_MAGIC)) && (bytes[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8))); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java new file mode 100644 index 0000000..1552e53 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java @@ -0,0 +1,159 @@ +package org.apache.helix.integration; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathConfig; +import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.helix.model.builder.IdealStateBuilder; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier; +import org.apache.helix.util.GZipCompressionUtil; +import org.testng.Assert; +import org.testng.annotations.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Test controller, spectator and participant roles when compression is enabled. + * Compression can be enabled for a specific resource by setting enableCompression=true in the + * idealstate of the resource. Generally this is used when the number of partitions is large + */ +public class TestEnableCompression extends ZkIntegrationTestBase { + @Test() + public void testEnableCompressionResource() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + MockParticipantManager[] participants = new MockParticipantManager[5]; + // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + int numNodes = 10; + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 0, // no resources, will be added later + 0, // partitions per resource + numNodes, // number of nodes + 0, // replicas + "OnlineOffline", false); // dont rebalance + List<String> instancesInCluster = + _gSetupTool.getClusterManagementTool().getInstancesInCluster(clusterName); + String resourceName = "TestResource"; + CustomModeISBuilder customModeISBuilder = new CustomModeISBuilder(resourceName); + + int numPartitions = 10000; + int numReplica = 3; + customModeISBuilder.setNumPartitions(numPartitions); + customModeISBuilder.setNumReplica(numReplica); + customModeISBuilder.setStateModel("OnlineOffline"); + for (int p = 0; p < numPartitions; p++) { + String partitionName = resourceName + "_" + p; + customModeISBuilder.add(partitionName); + for (int r = 0; r < numReplica; r++) { + String instanceName = instancesInCluster.get((p % numNodes + r) % numNodes); + customModeISBuilder.assignInstanceAndState(partitionName, instanceName, "ONLINE"); + } + } + + IdealState idealstate = customModeISBuilder.build(); + idealstate.getRecord().setBooleanField("enableCompression", true); + _gSetupTool.getClusterManagementTool().addResource(clusterName, resourceName, idealstate); + + ZkClient zkClient = + new ZkClient(ZK_ADDR, 60 * 1000, 60 * 1000, new BytesPushThroughSerializer()); + zkClient.waitUntilConnected(10, TimeUnit.SECONDS); + + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + for (int i = 0; i < 5; i++) { + String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + List<String> compressedPaths = new ArrayList<String>(); + findCompressedZNodes(zkClient, "/", compressedPaths); + + System.out.println("compressed paths:" + compressedPaths); + // ONLY IDEALSTATE and EXTERNAL VIEW must be compressed + Assert.assertEquals(compressedPaths.size(), 2); + String idealstatePath = + PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName); + String externalViewPath = + PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, resourceName); + Assert.assertTrue(compressedPaths.contains(idealstatePath)); + Assert.assertTrue(compressedPaths.contains(externalViewPath)); + + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + private void findCompressedZNodes(ZkClient zkClient, String path, List<String> compressedPaths) { + List<String> children = zkClient.getChildren(path); + if (children != null && children.size() > 0) { + for (String child : children) { + String childPath = (path.equals("/") ? "" : path) + "/" + child; + findCompressedZNodes(zkClient, childPath, compressedPaths); + } + } else { + byte[] data = zkClient.readData(path); + if (data != null && GZipCompressionUtil.isCompressed(data)) { + compressedPaths.add(path); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java new file mode 100644 index 0000000..90c1e8e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java @@ -0,0 +1,127 @@ +package org.apache.helix.manager.zk; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import org.apache.helix.ZNRecord; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class TestZNRecordSerializer { + /** + * Test the normal case of serialize/deserialize where ZNRecord is well-formed + */ + @Test + public void basicTest() { + ZNRecord record = new ZNRecord("testId"); + record.setMapField("k1", ImmutableMap.of("a", "b", "c", "d")); + record.setMapField("k2", ImmutableMap.of("e", "f", "g", "h")); + record.setListField("k3", ImmutableList.of("a", "b", "c", "d")); + record.setListField("k4", ImmutableList.of("d", "e", "f", "g")); + record.setSimpleField("k5", "a"); + record.setSimpleField("k5", "b"); + ZNRecordSerializer serializer = new ZNRecordSerializer(); + ZNRecord result = (ZNRecord) serializer.deserialize(serializer.serialize(record)); + Assert.assertEquals(result, record); + } + + /** + * Test that simple, list, and map fields are initialized as empty even when not in json + */ + @Test + public void fieldAutoInitTest() { + StringBuilder jsonString = new StringBuilder("{\n").append("\"id\": \"myId\"\n").append("}"); + ZNRecordSerializer serializer = new ZNRecordSerializer(); + ZNRecord result = (ZNRecord) serializer.deserialize(jsonString.toString().getBytes()); + Assert.assertNotNull(result); + Assert.assertEquals(result.getId(), "myId"); + Assert.assertNotNull(result.getSimpleFields()); + Assert.assertTrue(result.getSimpleFields().isEmpty()); + Assert.assertNotNull(result.getListFields()); + Assert.assertTrue(result.getListFields().isEmpty()); + Assert.assertNotNull(result.getMapFields()); + Assert.assertTrue(result.getMapFields().isEmpty()); + } + + @Test + public void testBasicCompression() { + ZNRecord record = new ZNRecord("testId"); + int numPartitions = 1024; + int replicas = 3; + int numNodes = 100; + Random random = new Random(); + for (int p = 0; p < numPartitions; p++) { + Map<String, String> map = new HashMap<String, String>(); + for (int r = 0; r < replicas; r++) { + map.put("host_" + random.nextInt(numNodes), "ONLINE"); + } + record.setMapField("TestResource_" + p, map); + } + ZNRecordSerializer serializer = new ZNRecordSerializer(); + byte[] serializedBytes; + serializedBytes = serializer.serialize(record); + int uncompressedSize = serializedBytes.length; + System.out.println("raw serialized data length = " + serializedBytes.length); + record.setSimpleField("enableCompression", "true"); + serializedBytes = serializer.serialize(record); + int compressedSize = serializedBytes.length; + System.out.println("compressed serialized data length = " + serializedBytes.length); + System.out.printf("compression ratio: %.2f \n", (uncompressedSize * 1.0 / compressedSize)); + ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes); + Assert.assertEquals(result, record); + } + + @Test + public void testCompression() { + int runId = 1; + while (runId < 20) { + int numPartitions = runId * 1000; + int replicas = 3; + int numNodes = 100; + Random random = new Random(); + ZNRecord record = new ZNRecord("testId"); + System.out.println("Partitions:" + numPartitions); + for (int p = 0; p < numPartitions; p++) { + Map<String, String> map = new HashMap<String, String>(); + for (int r = 0; r < replicas; r++) { + map.put("host_" + random.nextInt(numNodes), "ONLINE"); + } + record.setMapField("TestResource_" + p, map); + } + ZNRecordSerializer serializer = new ZNRecordSerializer(); + byte[] serializedBytes; + record.setSimpleField("enableCompression", "true"); + serializedBytes = serializer.serialize(record); + int compressedSize = serializedBytes.length; + System.out.println("compressed serialized data length = " + compressedSize); + ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes); + Assert.assertEquals(result, record); + runId = runId + 1; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java index e4b0b25..95064f8 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java @@ -1,5 +1,9 @@ package org.apache.helix.manager.zk; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + import org.apache.helix.ZNRecord; import org.testng.Assert; import org.testng.annotations.Test; @@ -89,4 +93,60 @@ public class TestZNRecordStreamingSerializer { Assert.assertNotNull(result.getMapFields()); Assert.assertTrue(result.getMapFields().isEmpty()); } + @Test + public void testBasicCompression() { + ZNRecord record = new ZNRecord("testId"); + int numPartitions = 1024; + int replicas = 3; + int numNodes = 100; + Random random = new Random(); + for (int p = 0; p < numPartitions; p++) { + Map<String, String> map = new HashMap<String, String>(); + for (int r = 0; r < replicas; r++) { + map.put("host_" + random.nextInt(numNodes), "ONLINE"); + } + record.setMapField("TestResource_" + p, map); + } + ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer(); + byte[] serializedBytes; + serializedBytes = serializer.serialize(record); + int uncompressedSize = serializedBytes.length; + System.out.println("raw serialized data length = " + serializedBytes.length); + record.setSimpleField("enableCompression", "true"); + serializedBytes = serializer.serialize(record); + int compressedSize = serializedBytes.length; + System.out.println("compressed serialized data length = " + serializedBytes.length); + System.out.printf("compression ratio: %.2f \n", (uncompressedSize * 1.0 / compressedSize)); + ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes); + Assert.assertEquals(result, record); + } + + @Test + public void testCompression() { + int runId = 1; + while (runId < 20) { + int numPartitions = runId * 1000; + int replicas = 3; + int numNodes = 100; + Random random = new Random(); + ZNRecord record = new ZNRecord("testId"); + System.out.println("Partitions:" + numPartitions); + for (int p = 0; p < numPartitions; p++) { + Map<String, String> map = new HashMap<String, String>(); + for (int r = 0; r < replicas; r++) { + map.put("host_" + random.nextInt(numNodes), "ONLINE"); + } + record.setMapField("TestResource_" + p, map); + } + ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer(); + byte[] serializedBytes; + record.setSimpleField("enableCompression", "true"); + serializedBytes = serializer.serialize(record); + int compressedSize = serializedBytes.length; + System.out.println("compressed serialized data length = " + compressedSize); + ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes); + Assert.assertEquals(result, record); + runId = runId + 1; + } + } }
