Repository: apex-malhar
Updated Branches:
  refs/heads/master e0081143f -> 0a924ada9


APEXMALHAR-2247 #resolve Added iteration feature in SpillableArrayListImpl and 
generalize SerdeListSlice to SerdeCollectionSlice


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/fd20718a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/fd20718a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/fd20718a

Branch: refs/heads/master
Commit: fd20718a7e7fd832eb0cce59b6d5ef9d3b388e40
Parents: 9f9da0e
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:53:21 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Sep 19 17:53:21 2016 -0700

----------------------------------------------------------------------
 .../state/spillable/SpillableArrayListImpl.java |  29 ++++-
 .../lib/utils/serde/SerdeCollectionSlice.java   | 120 +++++++++++++++++++
 .../malhar/lib/utils/serde/SerdeListSlice.java  | 111 -----------------
 .../lib/state/spillable/SpillableTestUtils.java |  14 ++-
 .../utils/serde/SerdeCollectionSliceTest.java   |  65 ++++++++++
 .../lib/utils/serde/SerdeListSliceTest.java     |  45 -------
 6 files changed, 219 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index da5b140..4ea1923 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.malhar.lib.state.spillable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -26,8 +27,8 @@ import java.util.ListIterator;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
 import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -92,7 +93,8 @@ public class SpillableArrayListImpl<T> implements 
Spillable.SpillableArrayList<T
     this.store = Preconditions.checkNotNull(store);
     this.serde = Preconditions.checkNotNull(serde);
 
-    map = new SpillableByteMapImpl<>(store, prefix, bucketId, new 
SerdeIntSlice(), new SerdeListSlice(serde));
+    map = new SpillableByteMapImpl<>(store, prefix, bucketId, new 
SerdeIntSlice(),
+        new SerdeCollectionSlice<>(serde, 
(Class<List<T>>)(Class)ArrayList.class));
   }
 
   /**
@@ -145,7 +147,28 @@ public class SpillableArrayListImpl<T> implements 
Spillable.SpillableArrayList<T
   @Override
   public Iterator<T> iterator()
   {
-    throw new UnsupportedOperationException();
+    return new Iterator<T>()
+    {
+      private int index = 0;
+
+      @Override
+      public boolean hasNext()
+      {
+        return index < size;
+      }
+
+      @Override
+      public T next()
+      {
+        return get(index++);
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
new file mode 100644
index 0000000..eca1d5f
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and 
deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> 
implements Serde<CollectionT, Slice>
+{
+  @NotNull
+  private Serde<T, Slice> serde;
+
+  @NotNull
+  private Class<? extends CollectionT> collectionClass;
+
+  private SerdeCollectionSlice()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link SerdeCollectionSlice}.
+   * @param serde The {@link Serde} that is used to serialize and deserialize 
each element of a list.
+   */
+  public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? 
extends CollectionT> collectionClass)
+  {
+    this.serde = Preconditions.checkNotNull(serde);
+    this.collectionClass = Preconditions.checkNotNull(collectionClass);
+  }
+
+  @Override
+  public Slice serialize(CollectionT objects)
+  {
+    Slice[] slices = new Slice[objects.size()];
+
+    int size = 4;
+
+    int index = 0;
+    for (T object : objects) {
+      Slice slice = serde.serialize(object);
+      slices[index++] = slice;
+      size += slice.length;
+    }
+
+    byte[] bytes = new byte[size];
+    int offset = 0;
+
+    byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
+    System.arraycopy(sizeBytes, 0, bytes, offset, 4);
+    offset += 4;
+
+    for (index = 0; index < slices.length; index++) {
+      Slice slice = slices[index];
+      System.arraycopy(slice.buffer, slice.offset, bytes, offset, 
slice.length);
+      offset += slice.length;
+    }
+
+    return new Slice(bytes);
+  }
+
+  @Override
+  public CollectionT deserialize(Slice slice, MutableInt offset)
+  {
+    MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
+
+    int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
+    sliceOffset.subtract(slice.offset);
+    try {
+      CollectionT collection = collectionClass.newInstance();
+
+      for (int index = 0; index < numElements; index++) {
+        T object = serde.deserialize(slice, sliceOffset);
+        collection.add(object);
+      }
+
+      offset.setValue(sliceOffset.intValue());
+      return collection;
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  @Override
+  public CollectionT deserialize(Slice slice)
+  {
+    return deserialize(slice, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
deleted file mode 100644
index 68d11c8..0000000
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.List;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and 
deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeListSlice<T> implements Serde<List<T>, Slice>
-{
-  @NotNull
-  private Serde<T, Slice> serde;
-
-  private SerdeListSlice()
-  {
-    // for Kryo
-  }
-
-  /**
-   * Creates a {@link SerdeListSlice}.
-   * @param serde The {@link Serde} that is used to serialize and deserialize 
each element of a list.
-   */
-  public SerdeListSlice(@NotNull Serde<T, Slice> serde)
-  {
-    this.serde = Preconditions.checkNotNull(serde);
-  }
-
-  @Override
-  public Slice serialize(List<T> objects)
-  {
-    Slice[] slices = new Slice[objects.size()];
-
-    int size = 4;
-
-    for (int index = 0; index < objects.size(); index++) {
-      Slice slice = serde.serialize(objects.get(index));
-      slices[index] = slice;
-      size += slice.length;
-    }
-
-    byte[] bytes = new byte[size];
-    int offset = 0;
-
-    byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
-    System.arraycopy(sizeBytes, 0, bytes, offset, 4);
-    offset += 4;
-
-    for (int index = 0; index < slices.length; index++) {
-      Slice slice = slices[index];
-      System.arraycopy(slice.buffer, slice.offset, bytes, offset, 
slice.length);
-      offset += slice.length;
-    }
-
-    return new Slice(bytes);
-  }
-
-  @Override
-  public List<T> deserialize(Slice slice, MutableInt offset)
-  {
-    MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
-    int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
-    List<T> list = Lists.newArrayListWithCapacity(numElements);
-    sliceOffset.subtract(slice.offset);
-
-    for (int index = 0; index < numElements; index++) {
-      T object = serde.deserialize(slice, sliceOffset);
-      list.add(object);
-    }
-
-    offset.setValue(sliceOffset.intValue());
-    return list;
-  }
-
-  @Override
-  public List<T> deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index 00ea58d..36e3557 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.malhar.lib.state.spillable;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.junit.Assert;
@@ -27,7 +28,7 @@ import org.junit.runner.Description;
 import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
 import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
 import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
 import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -44,18 +45,19 @@ import com.datatorrent.netlet.util.Slice;
 public class SpillableTestUtils
 {
   public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
-  public static SerdeListSlice<String> SERDE_STRING_LIST_SLICE = new 
SerdeListSlice(new SerdeStringSlice());
+  public static SerdeCollectionSlice<String, List<String>> 
SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(),
+      (Class<List<String>>)(Class)ArrayList.class);
 
   private SpillableTestUtils()
   {
     //Shouldn't instantiate this
   }
 
-  static class TestMeta extends TestWatcher
+  public static class TestMeta extends TestWatcher
   {
-    ManagedStateSpillableStateStore store;
-    Context.OperatorContext operatorContext;
-    String applicationPath;
+    public ManagedStateSpillableStateStore store;
+    public Context.OperatorContext operatorContext;
+    public String applicationPath;
 
     @Override
     protected void starting(Description description)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
new file mode 100644
index 0000000..f6085f6
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdeCollectionSliceTest
+{
+  @Test
+  public void testSerdeList()
+  {
+    SerdeCollectionSlice<String, List<String>> serdeList =
+        new SerdeCollectionSlice<>(new SerdeStringSlice(), 
(Class<List<String>>)(Class)ArrayList.class);
+
+    List<String> stringList = Lists.newArrayList("a", "b", "c");
+
+    Slice slice = serdeList.serialize(stringList);
+
+    List<String> deserializedList = serdeList.deserialize(slice);
+
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void testSerdeSet()
+  {
+    SerdeCollectionSlice<String, Set<String>> serdeSet =
+        new SerdeCollectionSlice<>(new SerdeStringSlice(), 
(Class<Set<String>>)(Class)HashSet.class);
+
+    Set<String> stringList = Sets.newHashSet("a", "b", "c");
+
+    Slice slice = serdeSet.serialize(stringList);
+
+    Set<String> deserializedSet = serdeSet.deserialize(slice);
+
+    Assert.assertEquals(stringList, deserializedSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/fd20718a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
deleted file mode 100644
index f7753d2..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdeListSliceTest
-{
-  @Test
-  public void simpleSerdeTest()
-  {
-    SerdeListSlice<String> serdeList = new SerdeListSlice<String>(new 
SerdeStringSlice());
-
-    List<String> stringList = Lists.newArrayList("a", "b", "c");
-
-    Slice slice = serdeList.serialize(stringList);
-
-    List<String> deserializedList = serdeList.deserialize(slice);
-
-    Assert.assertEquals(stringList, deserializedList);
-  }
-}

Reply via email to