Repository: spark
Updated Branches:
  refs/heads/master 08ef7d718 -> 979bf946d


[SPARK-20655][CORE] In-memory KVStore implementation.

This change adds an in-memory implementation of KVStore that can be
used by the live UI.

The implementation is not fully optimized, neither for speed nor
space, but should be fast enough for using in the listener bus.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #18395 from vanzin/SPARK-20655.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/979bf946
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/979bf946
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/979bf946

Branch: refs/heads/master
Commit: 979bf946d5fc7c15c2fdaa2e6c4df07bbbb74d93
Parents: 08ef7d7
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Aug 8 11:02:54 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Aug 8 11:02:54 2017 -0700

----------------------------------------------------------------------
 common/kvstore/pom.xml                          |   5 +
 .../org/apache/spark/kvstore/ArrayWrappers.java | 213 ++++++++++++
 .../org/apache/spark/kvstore/InMemoryStore.java | 320 +++++++++++++++++++
 .../org/apache/spark/kvstore/KVTypeInfo.java    |   2 +-
 .../apache/spark/kvstore/ArrayKeyIndexType.java |  44 +++
 .../spark/kvstore/ArrayWrappersSuite.java       |  59 ++++
 .../spark/kvstore/InMemoryIteratorSuite.java    |  27 ++
 .../spark/kvstore/InMemoryStoreSuite.java       | 161 ++++++++++
 .../org/apache/spark/kvstore/LevelDBSuite.java  |  24 --
 project/SparkBuild.scala                        |   3 +-
 10 files changed, 832 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/pom.xml
----------------------------------------------------------------------
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index d00cf27..cf93d41 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -36,6 +36,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
new file mode 100644
index 0000000..5efa842
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/ArrayWrappers.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A factory for array wrappers so that arrays can be used as keys in a map, 
sorted or not.
+ *
+ * The comparator implementation makes two assumptions:
+ * - All elements are instances of Comparable
+ * - When comparing two arrays, they both contain elements of the same type in 
corresponding
+ *   indices.
+ *
+ * Otherwise, ClassCastExceptions may occur. The equality method can compare 
any two arrays.
+ *
+ * This class is not efficient and is mostly meant to compare really small 
arrays, like those
+ * generally used as indices and keys in a KVStore.
+ */
+class ArrayWrappers {
+
+  @SuppressWarnings("unchecked")
+  public static Comparable<Object> forArray(Object a) {
+    Preconditions.checkArgument(a.getClass().isArray());
+    Comparable<?> ret;
+    if (a instanceof int[]) {
+      ret = new ComparableIntArray((int[]) a);
+    } else if (a instanceof long[]) {
+      ret = new ComparableLongArray((long[]) a);
+    } else if (a instanceof byte[]) {
+      ret = new ComparableByteArray((byte[]) a);
+    } else {
+      
Preconditions.checkArgument(!a.getClass().getComponentType().isPrimitive());
+      ret = new ComparableObjectArray((Object[]) a);
+    }
+    return (Comparable<Object>) ret;
+  }
+
+  private static class ComparableIntArray implements 
Comparable<ComparableIntArray> {
+
+    private final int[] array;
+
+    ComparableIntArray(int[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableIntArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableIntArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableIntArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableLongArray implements 
Comparable<ComparableLongArray> {
+
+    private final long[] array;
+
+    ComparableLongArray(long[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableLongArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableLongArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + (int) array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableLongArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        long diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff > 0 ? 1 : -1;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableByteArray implements 
Comparable<ComparableByteArray> {
+
+    private final byte[] array;
+
+    ComparableByteArray(byte[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableByteArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableByteArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i];
+      }
+      return code;
+    }
+
+    @Override
+    public int compareTo(ComparableByteArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = array[i] - other.array[i];
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+  private static class ComparableObjectArray implements 
Comparable<ComparableObjectArray> {
+
+    private final Object[] array;
+
+    ComparableObjectArray(Object[] array) {
+      this.array = array;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof ComparableObjectArray)) {
+        return false;
+      }
+      return Arrays.equals(array, ((ComparableObjectArray) other).array);
+    }
+
+    @Override
+    public int hashCode() {
+      int code = 0;
+      for (int i = 0; i < array.length; i++) {
+        code = (code * 31) + array[i].hashCode();
+      }
+      return code;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public int compareTo(ComparableObjectArray other) {
+      int len = Math.min(array.length, other.array.length);
+      for (int i = 0; i < len; i++) {
+        int diff = ((Comparable<Object>) 
array[i]).compareTo((Comparable<Object>) other.array[i]);
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      return array.length - other.array.length;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
new file mode 100644
index 0000000..f3aeb82
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/InMemoryStore.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that keeps data deserialized in memory. This 
store does not index
+ * data; instead, whenever iterating over an indexed field, the stored data is 
copied and sorted
+ * according to the index. This saves memory but makes iteration more 
expensive.
+ */
+@Private
+public class InMemoryStore implements KVStore {
+
+  private Object metadata;
+  private ConcurrentMap<Class<?>, InstanceList> data = new 
ConcurrentHashMap<>();
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) {
+    return klass.cast(metadata);
+  }
+
+  @Override
+  public void setMetadata(Object value) {
+    this.metadata = value;
+  }
+
+  @Override
+  public long count(Class<?> type) {
+    InstanceList list = data.get(type);
+    return list != null ? list.size() : 0;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    InstanceList list = data.get(type);
+    int count = 0;
+    Object comparable = asKey(indexedValue);
+    KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
+    for (Object o : view(type)) {
+      if (Objects.equal(comparable, asKey(accessor.get(o)))) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) {
+    InstanceList list = data.get(klass);
+    Object value = list != null ? list.get(naturalKey) : null;
+    if (value == null) {
+      throw new NoSuchElementException();
+    }
+    return klass.cast(value);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
+      try {
+        return new InstanceList(key);
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    });
+    list.put(value);
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) {
+    InstanceList list = data.get(type);
+    if (list != null) {
+      list.delete(naturalKey);
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type){
+    InstanceList list = data.get(type);
+    return list != null ? list.view(type)
+      : new InMemoryView<>(type, Collections.<T>emptyList(), null);
+  }
+
+  @Override
+  public void close() {
+    metadata = null;
+    data.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Comparable<Object> asKey(Object in) {
+    if (in.getClass().isArray()) {
+      in = ArrayWrappers.forArray(in);
+    }
+    return (Comparable<Object>) in;
+  }
+
+  private static class InstanceList {
+
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor naturalKey;
+    private final ConcurrentMap<Comparable<Object>, Object> data;
+
+    private int size;
+
+    private InstanceList(Class<?> type) throws Exception {
+      this.ti = new KVTypeInfo(type);
+      this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
+      this.data = new ConcurrentHashMap<>();
+      this.size = 0;
+    }
+
+    KVTypeInfo.Accessor getIndexAccessor(String indexName) {
+      return ti.getAccessor(indexName);
+    }
+
+    public Object get(Object key) {
+      return data.get(asKey(key));
+    }
+
+    public void put(Object value) throws Exception {
+      Preconditions.checkArgument(ti.type().equals(value.getClass()),
+        "Unexpected type: %s", value.getClass());
+      if (data.put(asKey(naturalKey.get(value)), value) == null) {
+        size++;
+      }
+    }
+
+    public void delete(Object key) {
+      if (data.remove(asKey(key)) != null) {
+        size--;
+      }
+    }
+
+    public int size() {
+      return size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> InMemoryView<T> view(Class<T> type) {
+      Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: 
%s", type);
+      Collection<T> all = (Collection<T>) data.values();
+      return new InMemoryView(type, all, ti);
+    }
+
+  }
+
+  private static class InMemoryView<T> extends KVStoreView<T> {
+
+    private final Collection<T> elements;
+    private final KVTypeInfo ti;
+    private final KVTypeInfo.Accessor natural;
+
+    InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
+      super(type);
+      this.elements = elements;
+      this.ti = ti;
+      this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : 
null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      if (elements.isEmpty()) {
+        return new InMemoryIterator<>(elements.iterator());
+      }
+
+      try {
+        KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : 
null;
+        int modifier = ascending ? 1 : -1;
+
+        final List<T> sorted = copyElements();
+        Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, 
getter));
+        Stream<T> stream = sorted.stream();
+
+        if (first != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, first) >= 
0);
+        }
+
+        if (last != null) {
+          stream = stream.filter(e -> modifier * compare(e, getter, last) <= 
0);
+        }
+
+        if (skip > 0) {
+          stream = stream.skip(skip);
+        }
+
+        if (max < sorted.size()) {
+          stream = stream.limit((int) max);
+        }
+
+        return new InMemoryIterator<>(stream.iterator());
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    /**
+     * Create a copy of the input elements, filtering the values for child 
indices if needed.
+     */
+    private List<T> copyElements() {
+      if (parent != null) {
+        KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
+        Preconditions.checkArgument(parentGetter != null, "Parent filter for 
non-child index.");
+
+        return elements.stream()
+          .filter(e -> compare(e, parentGetter, parent) == 0)
+          .collect(Collectors.toList());
+      } else {
+        return new ArrayList<>(elements);
+      }
+    }
+
+    private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
+      try {
+        int diff = compare(e1, getter, getter.get(e2));
+        if (diff == 0 && getter != natural) {
+          diff = compare(e1, natural, natural.get(e2));
+        }
+        return diff;
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
+      try {
+        return asKey(getter.get(e1)).compareTo(asKey(v2));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+  }
+
+  private static class InMemoryIterator<T> implements KVStoreIterator<T> {
+
+    private final Iterator<T> iter;
+
+    InMemoryIterator(Iterator<T> iter) {
+      this.iter = iter;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public T next() {
+      return iter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<T> next(int max) {
+      List<T> list = new ArrayList<>(max);
+      while (hasNext() && list.size() < max) {
+        list.add(next());
+      }
+      return list;
+    }
+
+    @Override
+    public boolean skip(long n) {
+      long skipped = 0;
+      while (skipped < n) {
+        if (hasNext()) {
+          next();
+          skipped++;
+        } else {
+          return false;
+        }
+      }
+
+      return hasNext();
+    }
+
+    @Override
+    public void close() {
+      // no op.
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
index e1cc0ba..e3e61e0 100644
--- a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
@@ -89,7 +89,7 @@ public class KVTypeInfo {
       "Duplicate index %s for type %s.", idx.value(), type.getName());
   }
 
-  public Class<?> getType() {
+  public Class<?> type() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
new file mode 100644
index 0000000..d5938ac
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayKeyIndexType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+
+public class ArrayKeyIndexType {
+
+  @KVIndex
+  public int[] key;
+
+  @KVIndex("id")
+  public String[] id;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ArrayKeyIndexType) {
+      ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+      return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return key.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
new file mode 100644
index 0000000..f9b4774
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/ArrayWrappersSuite.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ArrayWrappersSuite {
+
+  @Test
+  public void testGenericArrayKey() {
+   byte[] b1 = new byte[] { 0x01, 0x02, 0x03 };
+   byte[] b2 = new byte[] { 0x01, 0x02 };
+   int[] i1 = new int[] { 1, 2, 3 };
+   int[] i2 = new int[] { 1, 2 };
+   String[] s1 = new String[] { "1", "2", "3" };
+   String[] s2 = new String[] { "1", "2" };
+
+   assertEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b2));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i2));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s2));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(i1));
+
+   assertEquals(0, 
ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b1)));
+   assertTrue(ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b2)) 
> 0);
+
+   assertEquals(0, 
ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i1)));
+   assertTrue(ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i2)) 
> 0);
+
+   assertEquals(0, 
ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s1)));
+   assertTrue(ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s2)) 
> 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
new file mode 100644
index 0000000..57ee4f6
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+public class InMemoryIteratorSuite extends DBIteratorSuite {
+
+  @Override
+  protected KVStore createStore() {
+    return new InMemoryStore();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
new file mode 100644
index 0000000..6a7915f
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.NoSuchElementException;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class InMemoryStoreSuite {
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    try {
+      store.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    store.write(t);
+    assertEquals(t, store.read(t.getClass(), t.key));
+    assertEquals(1L, store.count(t.getClass()));
+
+    store.delete(t.getClass(), t.key);
+    try {
+      store.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+
+    store.write(t1);
+    store.write(t2);
+
+    assertEquals(t1, store.read(t1.getClass(), t1.key));
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    assertEquals(2L, store.count(t1.getClass()));
+
+    store.delete(t1.getClass(), t1.key);
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    store.delete(t2.getClass(), t2.key);
+    try {
+      store.read(t2.getClass(), t2.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    KVStore store = new InMemoryStore();
+    assertNull(store.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+
+    store.setMetadata(t);
+    assertEquals(t, store.getMetadata(CustomType1.class));
+
+    store.setMetadata(null);
+    assertNull(store.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    store.write(t);
+
+    t.name = "anotherName";
+
+    store.write(t);
+    assertEquals(1, store.count(t.getClass()));
+    assertSame(t, store.read(t.getClass(), t.key));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    ArrayKeyIndexType o = new ArrayKeyIndexType();
+    o.key = new int[] { 1, 2 };
+    o.id = new String[] { "3", "4" };
+
+    store.write(o);
+    assertEquals(o, store.read(ArrayKeyIndexType.class, o.key));
+    assertEquals(o, 
store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
+  }
+
+  @Test
+  public void testBasicIteration() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id1";
+    t1.name = "name1";
+    store.write(t1);
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "2";
+    t2.id = "id2";
+    t2.name = "name2";
+    store.write(t2);
+
+    assertEquals(t1.id, store.view(t1.getClass()).iterator().next().id);
+    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).iterator().next().id);
+    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).max(1).iterator().next().id);
+    assertEquals(t1.id,
+      store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id);
+    assertEquals(t2.id,
+      store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id);
+    
assertFalse(store.view(t1.getClass()).first(t2.id).skip(1).iterator().hasNext());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
index 42bff61..86c85c1 100644
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
@@ -283,28 +283,4 @@ public class LevelDBSuite {
 
   }
 
-  public static class ArrayKeyIndexType {
-
-    @KVIndex
-    public int[] key;
-
-    @KVIndex("id")
-    public String[] id;
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof ArrayKeyIndexType) {
-        ArrayKeyIndexType other = (ArrayKeyIndexType) o;
-        return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return key.hashCode();
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/979bf946/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b9db1df..371a171 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -230,7 +230,8 @@ object SparkBuild extends PomBuild {
 
     javacOptions in Compile ++= Seq(
       "-encoding", "UTF-8",
-      "-source", javacJVMVersion.value
+      "-source", javacJVMVersion.value,
+      "-Xlint:unchecked"
     ),
     // This -target option cannot be set in the Compile configuration scope 
since `javadoc` doesn't
     // play nicely with it; see 
https://github.com/sbt/sbt/issues/355#issuecomment-3817629 for


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to