Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 99baacf7f -> 823a6cac8


[HELIX-573] Add support to automatically compress/uncompress data in Zookeeper


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/823a6cac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/823a6cac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/823a6cac

Branch: refs/heads/helix-0.6.x
Commit: 823a6cac80d62786e02992d75391a5e8e205a288
Parents: 99baacf
Author: Kishore Gopalakrishna <[email protected]>
Authored: Wed Mar 11 08:46:26 2015 -0700
Committer: Kishore Gopalakrishna <[email protected]>
Committed: Wed Mar 11 09:55:13 2015 -0700

----------------------------------------------------------------------
 .../stages/ExternalViewComputeStage.java        |   4 +
 .../helix/manager/zk/ZNRecordSerializer.java    |  31 +++-
 .../manager/zk/ZNRecordStreamingSerializer.java |  30 +++-
 .../apache/helix/util/GZipCompressionUtil.java  |  73 +++++++++
 .../integration/TestEnableCompression.java      | 159 +++++++++++++++++++
 .../manager/zk/TestZNRecordSerializer.java      | 127 +++++++++++++++
 .../zk/TestZNRecordStreamingSerializer.java     |  60 +++++++
 7 files changed, 469 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 903840c..80529df 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -123,6 +123,7 @@ public class ExternalViewComputeStage extends 
AbstractBaseStage {
 
       // compare the new external view with current one, set only on different
       ExternalView curExtView = curExtViews.get(resourceName);
+
       if (curExtView == null || 
!curExtView.getRecord().equals(view.getRecord())) {
         keys.add(keyBuilder.externalView(resourceName));
         newExtViews.add(view);
@@ -137,6 +138,9 @@ public class ExternalViewComputeStage extends 
AbstractBaseStage {
                 DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
           updateScheduledTaskStatus(view, manager, idealState);
         }
+        if (idealState != null) {
+          
view.getRecord().getSimpleFields().putAll(idealState.getRecord().getSimpleFields());
+        }
       }
     }
     // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at 
all.

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
index 4419fdd..f8d3160 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordSerializer.java
@@ -20,13 +20,18 @@ package org.apache.helix.manager.zk;
  */
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -77,21 +82,27 @@ public class ZNRecordSerializer implements ZkSerializer {
     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
     serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, 
true);
     
serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS,
 true);
-    StringWriter sw = new StringWriter();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    byte[] serializedBytes = null;
     try {
-      mapper.writeValue(sw, data);
+      mapper.writeValue(baos, data);
+      serializedBytes = baos.toByteArray();
+      // apply compression if needed
+      if (record.getBooleanField("enableCompression", false)) {
+        serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+      }
     } catch (Exception e) {
       logger.error("Exception during data serialization. Will not write to zk. 
Data (first 1k): "
-          + sw.toString().substring(0, 1024), e);
+          + new String(baos.toByteArray()).substring(0, 1024), e);
       throw new HelixException(e);
     }
-
-    if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT) {
+    if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
       logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
-          + ". Will not write to zk. Data (first 1k): " + 
sw.toString().substring(0, 1024));
+          + ". Will not write to zk. Data (first 1k): "
+          + new String(serializedBytes).substring(0, 1024));
       throw new HelixException("Data size larger than 1M, ZNRecord.id: " + 
record.getId());
     }
-    return sw.toString().getBytes();
+    return serializedBytes;
   }
 
   @Override
@@ -109,7 +120,13 @@ public class ZNRecordSerializer implements ZkSerializer {
     
deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, 
true);
     
deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,
 true);
     try {
+      //decompress the data if its already compressed
+      if (GZipCompressionUtil.isCompressed(bytes)) {
+        byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
+        bais = new ByteArrayInputStream(uncompressedBytes);
+      }
       ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
+
       return zn;
     } catch (Exception e) {
       logger.error("Exception during deserialization of bytes: " + new 
String(bytes), e);

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
index 2d7cb3c..53db50a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordStreamingSerializer.java
@@ -20,17 +20,21 @@ package org.apache.helix.manager.zk;
  */
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -76,11 +80,11 @@ public class ZNRecordStreamingSerializer implements 
ZkSerializer {
         }
       }
     }
-
-    StringWriter sw = new StringWriter();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    byte[] serializedBytes = null;
     try {
       JsonFactory f = new JsonFactory();
-      JsonGenerator g = f.createJsonGenerator(sw);
+      JsonGenerator g = f.createJsonGenerator(baos);
 
       g.writeStartObject();
 
@@ -150,20 +154,25 @@ public class ZNRecordStreamingSerializer implements 
ZkSerializer {
       // important: will force flushing of output, close underlying output
       // stream
       g.close();
+      serializedBytes = baos.toByteArray();
+      // apply compression if needed
+      if (record.getBooleanField("enableCompression", false)) {
+        serializedBytes = GZipCompressionUtil.compress(serializedBytes);
+      }
     } catch (Exception e) {
       LOG.error("Exception during data serialization. Will not write to zk. 
Data (first 1k): "
-          + sw.toString().substring(0, 1024), e);
+          + new String(baos.toByteArray()).substring(0, 1024), e);
       throw new HelixException(e);
     }
-
     // check size
-    if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT) {
+    if (serializedBytes.length > ZNRecord.SIZE_LIMIT) {
       LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
-          + ". Will not write to zk. Data (first 1k): " + 
sw.toString().substring(0, 1024));
+          + ". Will not write to zk. Data (first 1k): "
+          + new String(serializedBytes).substring(0, 1024));
       throw new HelixException("Data size larger than 1M, ZNRecord.id: " + 
record.getId());
     }
 
-    return sw.toString().getBytes();
+    return serializedBytes;
   }
 
   @Override
@@ -183,6 +192,11 @@ public class ZNRecordStreamingSerializer implements 
ZkSerializer {
     byte[] rawPayload = null;
 
     try {
+      // decompress the data if its already compressed
+      if (GZipCompressionUtil.isCompressed(bytes)) {
+        byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
+        bais = new ByteArrayInputStream(uncompressedBytes);
+      }
       JsonFactory f = new JsonFactory();
       JsonParser jp = f.createJsonParser(bais);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java
new file mode 100644
index 0000000..f29a301
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/GZipCompressionUtil.java
@@ -0,0 +1,73 @@
+package org.apache.helix.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GZipCompressionUtil {
+  /**
+   * Compresses a byte array by applying GZIP compression
+   * @param serializedBytes
+   * @return
+   * @throws IOException
+   */
+  public static byte[] compress(byte[] buffer) throws IOException {
+    ByteArrayOutputStream gzipByteArrayOutputStream = new 
ByteArrayOutputStream();
+    GZIPOutputStream gzipOutputStream = null;
+    gzipOutputStream = new GZIPOutputStream(gzipByteArrayOutputStream);
+    gzipOutputStream.write(buffer, 0, buffer.length);
+    gzipOutputStream.close();
+    byte[] compressedBytes = gzipByteArrayOutputStream.toByteArray();
+    return compressedBytes;
+  }
+
+  public static byte[] uncompress(ByteArrayInputStream bais) throws 
IOException {
+    GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
+    byte[] buffer = new byte[1024];
+    int length;
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    while ((length = gzipInputStream.read(buffer)) != -1) {
+      baos.write(buffer, 0, length);
+    }
+    gzipInputStream.close();
+    baos.close();
+    byte[] uncompressedBytes = baos.toByteArray();
+    return uncompressedBytes;
+  }
+
+  /*
+   * Determines if a byte array is compressed. The java.util.zip GZip
+   * implementaiton does not expose the GZip header so it is difficult to 
determine
+   * if a string is compressed.
+   * @param bytes an array of bytes
+   * @return true if the array is compressed or false otherwise
+   */
+  public static boolean isCompressed(byte[] bytes) {
+    if ((bytes == null) || (bytes.length < 2)) {
+      return false;
+    } else {
+      return ((bytes[0] == (byte) (GZIPInputStream.GZIP_MAGIC)) && (bytes[1] 
== (byte) (GZIPInputStream.GZIP_MAGIC >> 8)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
new file mode 100644
index 0000000..1552e53
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
@@ -0,0 +1,159 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.model.builder.IdealStateBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.util.GZipCompressionUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Test controller, spectator and participant roles when compression is 
enabled.
+ * Compression can be enabled for a specific resource by setting 
enableCompression=true in the
+ * idealstate of the resource. Generally this is used when the number of 
partitions is large
+ */
+public class TestEnableCompression extends ZkIntegrationTestBase {
+  @Test()
+  public void testEnableCompressionResource() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    MockParticipantManager[] participants = new MockParticipantManager[5];
+    // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    int numNodes = 10;
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        0, // no resources, will be added later
+        0, // partitions per resource
+        numNodes, // number of nodes
+        0, // replicas
+        "OnlineOffline", false); // dont rebalance
+    List<String> instancesInCluster =
+        
_gSetupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
+    String resourceName = "TestResource";
+    CustomModeISBuilder customModeISBuilder = new 
CustomModeISBuilder(resourceName);
+
+    int numPartitions = 10000;
+    int numReplica = 3;
+    customModeISBuilder.setNumPartitions(numPartitions);
+    customModeISBuilder.setNumReplica(numReplica);
+    customModeISBuilder.setStateModel("OnlineOffline");
+    for (int p = 0; p < numPartitions; p++) {
+      String partitionName = resourceName + "_" + p;
+      customModeISBuilder.add(partitionName);
+      for (int r = 0; r < numReplica; r++) {
+        String instanceName = instancesInCluster.get((p % numNodes + r) % 
numNodes);
+        customModeISBuilder.assignInstanceAndState(partitionName, 
instanceName, "ONLINE");
+      }
+    }
+
+    IdealState idealstate = customModeISBuilder.build();
+    idealstate.getRecord().setBooleanField("enableCompression", true);
+    _gSetupTool.getClusterManagementTool().addResource(clusterName, 
resourceName, idealstate);
+
+    ZkClient zkClient =
+        new ZkClient(ZK_ADDR, 60 * 1000, 60 * 1000, new 
BytesPushThroughSerializer());
+    zkClient.waitUntilConnected(10, TimeUnit.SECONDS);
+
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < 5; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    List<String> compressedPaths = new ArrayList<String>();
+    findCompressedZNodes(zkClient, "/", compressedPaths);
+
+    System.out.println("compressed paths:" + compressedPaths);
+    // ONLY IDEALSTATE and EXTERNAL VIEW must be compressed
+    Assert.assertEquals(compressedPaths.size(), 2);
+    String idealstatePath =
+        PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, 
resourceName);
+    String externalViewPath =
+        PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, 
resourceName);
+    Assert.assertTrue(compressedPaths.contains(idealstatePath));
+    Assert.assertTrue(compressedPaths.contains(externalViewPath));
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  private void findCompressedZNodes(ZkClient zkClient, String path, 
List<String> compressedPaths) {
+    List<String> children = zkClient.getChildren(path);
+    if (children != null && children.size() > 0) {
+      for (String child : children) {
+        String childPath = (path.equals("/") ? "" : path) + "/" + child;
+        findCompressedZNodes(zkClient, childPath, compressedPaths);
+      }
+    } else {
+      byte[] data = zkClient.readData(path);
+      if (data != null && GZipCompressionUtil.isCompressed(data)) {
+        compressedPaths.add(path);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
new file mode 100644
index 0000000..90c1e8e
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordSerializer.java
@@ -0,0 +1,127 @@
+package org.apache.helix.manager.zk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.helix.ZNRecord;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestZNRecordSerializer {
+  /**
+   * Test the normal case of serialize/deserialize where ZNRecord is 
well-formed
+   */
+  @Test
+  public void basicTest() {
+    ZNRecord record = new ZNRecord("testId");
+    record.setMapField("k1", ImmutableMap.of("a", "b", "c", "d"));
+    record.setMapField("k2", ImmutableMap.of("e", "f", "g", "h"));
+    record.setListField("k3", ImmutableList.of("a", "b", "c", "d"));
+    record.setListField("k4", ImmutableList.of("d", "e", "f", "g"));
+    record.setSimpleField("k5", "a");
+    record.setSimpleField("k5", "b");
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    ZNRecord result = (ZNRecord) 
serializer.deserialize(serializer.serialize(record));
+    Assert.assertEquals(result, record);
+  }
+
+  /**
+   * Test that simple, list, and map fields are initialized as empty even when 
not in json
+   */
+  @Test
+  public void fieldAutoInitTest() {
+    StringBuilder jsonString = new StringBuilder("{\n").append("\"id\": 
\"myId\"\n").append("}");
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    ZNRecord result = (ZNRecord) 
serializer.deserialize(jsonString.toString().getBytes());
+    Assert.assertNotNull(result);
+    Assert.assertEquals(result.getId(), "myId");
+    Assert.assertNotNull(result.getSimpleFields());
+    Assert.assertTrue(result.getSimpleFields().isEmpty());
+    Assert.assertNotNull(result.getListFields());
+    Assert.assertTrue(result.getListFields().isEmpty());
+    Assert.assertNotNull(result.getMapFields());
+    Assert.assertTrue(result.getMapFields().isEmpty());
+  }
+
+  @Test
+  public void testBasicCompression() {
+    ZNRecord record = new ZNRecord("testId");
+    int numPartitions = 1024;
+    int replicas = 3;
+    int numNodes = 100;
+    Random random = new Random();
+    for (int p = 0; p < numPartitions; p++) {
+      Map<String, String> map = new HashMap<String, String>();
+      for (int r = 0; r < replicas; r++) {
+        map.put("host_" + random.nextInt(numNodes), "ONLINE");
+      }
+      record.setMapField("TestResource_" + p, map);
+    }
+    ZNRecordSerializer serializer = new ZNRecordSerializer();
+    byte[] serializedBytes;
+    serializedBytes = serializer.serialize(record);
+    int uncompressedSize = serializedBytes.length;
+    System.out.println("raw serialized data length = " + 
serializedBytes.length);
+    record.setSimpleField("enableCompression", "true");
+    serializedBytes = serializer.serialize(record);
+    int compressedSize = serializedBytes.length;
+    System.out.println("compressed serialized data length = " + 
serializedBytes.length);
+    System.out.printf("compression ratio: %.2f \n", (uncompressedSize * 1.0 / 
compressedSize));
+    ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes);
+    Assert.assertEquals(result, record);
+  }
+
+  @Test
+  public void testCompression() {
+    int runId = 1;
+    while (runId < 20) {
+      int numPartitions = runId * 1000;
+      int replicas = 3;
+      int numNodes = 100;
+      Random random = new Random();
+      ZNRecord record = new ZNRecord("testId");
+      System.out.println("Partitions:" + numPartitions);
+      for (int p = 0; p < numPartitions; p++) {
+        Map<String, String> map = new HashMap<String, String>();
+        for (int r = 0; r < replicas; r++) {
+          map.put("host_" + random.nextInt(numNodes), "ONLINE");
+        }
+        record.setMapField("TestResource_" + p, map);
+      }
+      ZNRecordSerializer serializer = new ZNRecordSerializer();
+      byte[] serializedBytes;
+      record.setSimpleField("enableCompression", "true");
+      serializedBytes = serializer.serialize(record);
+      int compressedSize = serializedBytes.length;
+      System.out.println("compressed serialized data length = " + 
compressedSize);
+      ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes);
+      Assert.assertEquals(result, record);
+      runId = runId + 1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/823a6cac/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
index e4b0b25..95064f8 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZNRecordStreamingSerializer.java
@@ -1,5 +1,9 @@
 package org.apache.helix.manager.zk;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.helix.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -89,4 +93,60 @@ public class TestZNRecordStreamingSerializer {
     Assert.assertNotNull(result.getMapFields());
     Assert.assertTrue(result.getMapFields().isEmpty());
   }
+  @Test
+  public void testBasicCompression() {
+    ZNRecord record = new ZNRecord("testId");
+    int numPartitions = 1024;
+    int replicas = 3;
+    int numNodes = 100;
+    Random random = new Random();
+    for (int p = 0; p < numPartitions; p++) {
+      Map<String, String> map = new HashMap<String, String>();
+      for (int r = 0; r < replicas; r++) {
+        map.put("host_" + random.nextInt(numNodes), "ONLINE");
+      }
+      record.setMapField("TestResource_" + p, map);
+    }
+    ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
+    byte[] serializedBytes;
+    serializedBytes = serializer.serialize(record);
+    int uncompressedSize = serializedBytes.length;
+    System.out.println("raw serialized data length = " + 
serializedBytes.length);
+    record.setSimpleField("enableCompression", "true");
+    serializedBytes = serializer.serialize(record);
+    int compressedSize = serializedBytes.length;
+    System.out.println("compressed serialized data length = " + 
serializedBytes.length);
+    System.out.printf("compression ratio: %.2f \n", (uncompressedSize * 1.0 / 
compressedSize));
+    ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes);
+    Assert.assertEquals(result, record);
+  }
+
+  @Test
+  public void testCompression() {
+    int runId = 1;
+    while (runId < 20) {
+      int numPartitions = runId * 1000;
+      int replicas = 3;
+      int numNodes = 100;
+      Random random = new Random();
+      ZNRecord record = new ZNRecord("testId");
+      System.out.println("Partitions:" + numPartitions);
+      for (int p = 0; p < numPartitions; p++) {
+        Map<String, String> map = new HashMap<String, String>();
+        for (int r = 0; r < replicas; r++) {
+          map.put("host_" + random.nextInt(numNodes), "ONLINE");
+        }
+        record.setMapField("TestResource_" + p, map);
+      }
+      ZNRecordStreamingSerializer serializer = new 
ZNRecordStreamingSerializer();
+      byte[] serializedBytes;
+      record.setSimpleField("enableCompression", "true");
+      serializedBytes = serializer.serialize(record);
+      int compressedSize = serializedBytes.length;
+      System.out.println("compressed serialized data length = " + 
compressedSize);
+      ZNRecord result = (ZNRecord) serializer.deserialize(serializedBytes);
+      Assert.assertEquals(result, record);
+      runId = runId + 1;
+    }
+  }
 }

Reply via email to