http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
deleted file mode 100644
index 3a41818..0000000
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
-public abstract class DBIteratorSuite {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DBIteratorSuite.class);
-
-  private static final int MIN_ENTRIES = 42;
-  private static final int MAX_ENTRIES = 1024;
-  private static final Random RND = new Random();
-
-  private static List<CustomType1> allEntries;
-  private static List<CustomType1> clashingEntries;
-  private static KVStore db;
-
-  private interface BaseComparator extends Comparator<CustomType1> {
-    /**
-     * Returns a comparator that falls back to natural order if this 
comparator's ordering
-     * returns equality for two elements. Used to mimic how the index sorts 
things internally.
-     */
-    default BaseComparator fallback() {
-      return (t1, t2) -> {
-        int result = BaseComparator.this.compare(t1, t2);
-        if (result != 0) {
-          return result;
-        }
-
-        return t1.key.compareTo(t2.key);
-      };
-    }
-
-    /** Reverses the order of this comparator. */
-    default BaseComparator reverse() {
-      return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
-    }
-  }
-
-  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
-  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
-  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
-  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num 
- t2.num;
-  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
-
-  /**
-   * Implementations should override this method; it is called only once, 
before all tests are
-   * run. Any state can be safely stored in static variables and cleaned up in 
a @AfterClass
-   * handler.
-   */
-  protected abstract KVStore createStore() throws Exception;
-
-  @BeforeClass
-  public static void setupClass() {
-    long seed = RND.nextLong();
-    LOG.info("Random seed: {}", seed);
-    RND.setSeed(seed);
-  }
-
-  @AfterClass
-  public static void cleanupData() throws Exception {
-    allEntries = null;
-    db = null;
-  }
-
-  @Before
-  public void setup() throws Exception {
-    if (db != null) {
-      return;
-    }
-
-    db = createStore();
-
-    int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
-
-    allEntries = new ArrayList<>(count);
-    for (int i = 0; i < count; i++) {
-      CustomType1 t = new CustomType1();
-      t.key = "key" + i;
-      t.id = "id" + i;
-      t.name = "name" + RND.nextInt(MAX_ENTRIES);
-      t.num = RND.nextInt(MAX_ENTRIES);
-      t.child = "child" + (i % MIN_ENTRIES);
-      allEntries.add(t);
-    }
-
-    // Shuffle the entries to avoid the insertion order matching the natural 
ordering. Just in case.
-    Collections.shuffle(allEntries, RND);
-    for (CustomType1 e : allEntries) {
-      db.write(e);
-    }
-
-    // Pick the first generated value, and forcefully create a few entries 
that will clash
-    // with the indexed values (id and name), to make sure the index behaves 
correctly when
-    // multiple entities are indexed by the same value.
-    //
-    // This also serves as a test for the test code itself, to make sure it's 
sorting indices
-    // the same way the store is expected to.
-    CustomType1 first = allEntries.get(0);
-    clashingEntries = new ArrayList<>();
-
-    int clashCount = RND.nextInt(MIN_ENTRIES) + 1;
-    for (int i = 0; i < clashCount; i++) {
-      CustomType1 t = new CustomType1();
-      t.key = "n-key" + (count + i);
-      t.id = first.id;
-      t.name = first.name;
-      t.num = first.num;
-      t.child = first.child;
-      allEntries.add(t);
-      clashingEntries.add(t);
-      db.write(t);
-    }
-
-    // Create another entry that could cause problems: take the first entry, 
and make its indexed
-    // name be an extension of the existing ones, to make sure the 
implementation sorts these
-    // correctly even considering the separator character (shorter strings 
first).
-    CustomType1 t = new CustomType1();
-    t.key = "extended-key-0";
-    t.id = first.id;
-    t.name = first.name + "a";
-    t.num = first.num;
-    t.child = first.child;
-    allEntries.add(t);
-    db.write(t);
-  }
-
-  @Test
-  public void naturalIndex() throws Exception {
-    testIteration(NATURAL_ORDER, view(), null, null);
-  }
-
-  @Test
-  public void refIndex() throws Exception {
-    testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
-  }
-
-  @Test
-  public void copyIndex() throws Exception {
-    testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
-  }
-
-  @Test
-  public void numericIndex() throws Exception {
-    testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
-  }
-
-  @Test
-  public void childIndex() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id), 
null, null);
-  }
-
-  @Test
-  public void naturalIndexDescending() throws Exception {
-    testIteration(NATURAL_ORDER, view().reverse(), null, null);
-  }
-
-  @Test
-  public void refIndexDescending() throws Exception {
-    testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
-  }
-
-  @Test
-  public void copyIndexDescending() throws Exception {
-    testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, 
null);
-  }
-
-  @Test
-  public void numericIndexDescending() throws Exception {
-    testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, 
null);
-  }
-
-  @Test
-  public void childIndexDescending() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).reverse(), null, null);
-  }
-
-  @Test
-  public void naturalIndexWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(NATURAL_ORDER, view().first(first.key), first, null);
-  }
-
-  @Test
-  public void refIndexWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, 
null);
-  }
-
-  @Test
-  public void copyIndexWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), 
first, null);
-  }
-
-  @Test
-  public void numericIndexWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), 
first, null);
-  }
-
-  @Test
-  public void childIndexWithStart() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).first(any.child), null,
-      null);
-  }
-
-  @Test
-  public void naturalIndexDescendingWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, 
null);
-  }
-
-  @Test
-  public void refIndexDescendingWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(REF_INDEX_ORDER, 
view().reverse().index("id").first(first.id), first, null);
-  }
-
-  @Test
-  public void copyIndexDescendingWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").first(first.name), first, null);
-  }
-
-  @Test
-  public void numericIndexDescendingWithStart() throws Exception {
-    CustomType1 first = pickLimit();
-    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").first(first.num), first, null);
-  }
-
-  @Test
-  public void childIndexDescendingWithStart() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER,
-      view().index("child").parent(any.id).first(any.child).reverse(), null, 
null);
-  }
-
-  @Test
-  public void naturalIndexWithSkip() throws Exception {
-    testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null);
-  }
-
-  @Test
-  public void refIndexWithSkip() throws Exception {
-    testIteration(REF_INDEX_ORDER, view().index("id").skip(pickCount()), null, 
null);
-  }
-
-  @Test
-  public void copyIndexWithSkip() throws Exception {
-    testIteration(COPY_INDEX_ORDER, view().index("name").skip(pickCount()), 
null, null);
-  }
-
-  @Test
-  public void childIndexWithSkip() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).skip(pickCount()),
-      null, null);
-  }
-
-  @Test
-  public void naturalIndexWithMax() throws Exception {
-    testIteration(NATURAL_ORDER, view().max(pickCount()), null, null);
-  }
-
-  @Test
-  public void copyIndexWithMax() throws Exception {
-    testIteration(COPY_INDEX_ORDER, view().index("name").max(pickCount()), 
null, null);
-  }
-
-  @Test
-  public void childIndexWithMax() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).max(pickCount()), null,
-      null);
-  }
-
-  @Test
-  public void naturalIndexWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(NATURAL_ORDER, view().last(last.key), null, last);
-  }
-
-  @Test
-  public void refIndexWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, 
last);
-  }
-
-  @Test
-  public void copyIndexWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), 
null, last);
-  }
-
-  @Test
-  public void numericIndexWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), 
null, last);
-  }
-
-  @Test
-  public void childIndexWithLast() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child), null,
-      null);
-  }
-
-  @Test
-  public void naturalIndexDescendingWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
-  }
-
-  @Test
-  public void refIndexDescendingWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), 
null, last);
-  }
-
-  @Test
-  public void copyIndexDescendingWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").last(last.name),
-      null, last);
-  }
-
-  @Test
-  public void numericIndexDescendingWithLast() throws Exception {
-    CustomType1 last = pickLimit();
-    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").last(last.num),
-      null, last);
-   }
-
-  @Test
-  public void childIndexDescendingWithLast() throws Exception {
-    CustomType1 any = pickLimit();
-    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child).reverse(),
-      null, null);
-  }
-
-  @Test
-  public void testRefWithIntNaturalKey() throws Exception {
-    LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
-    i.key = 1;
-    i.id = "1";
-    i.values = Arrays.asList("1");
-
-    db.write(i);
-
-    try(KVStoreIterator<?> it = db.view(i.getClass()).closeableIterator()) {
-      Object read = it.next();
-      assertEquals(i, read);
-    }
-  }
-
-  private CustomType1 pickLimit() {
-    // Picks an element that has clashes with other elements in the given 
index.
-    return clashingEntries.get(RND.nextInt(clashingEntries.size()));
-  }
-
-  private int pickCount() {
-    int count = RND.nextInt(allEntries.size() / 2);
-    return Math.max(count, 1);
-  }
-
-  /**
-   * Compares the two values and falls back to comparing the natural key of 
CustomType1
-   * if they're the same, to mimic the behavior of the indexing code.
-   */
-  private <T extends Comparable<T>> int compareWithFallback(
-      T v1,
-      T v2,
-      CustomType1 ct1,
-      CustomType1 ct2) {
-    int result = v1.compareTo(v2);
-    if (result != 0) {
-      return result;
-    }
-
-    return ct1.key.compareTo(ct2.key);
-  }
-
-  private void testIteration(
-      final BaseComparator order,
-      final KVStoreView<CustomType1> params,
-      final CustomType1 first,
-      final CustomType1 last) throws Exception {
-    List<CustomType1> indexOrder = sortBy(order.fallback());
-    if (!params.ascending) {
-      indexOrder = Lists.reverse(indexOrder);
-    }
-
-    Iterable<CustomType1> expected = indexOrder;
-    BaseComparator expectedOrder = params.ascending ? order : order.reverse();
-
-    if (params.parent != null) {
-      expected = Iterables.filter(expected, v -> params.parent.equals(v.id));
-    }
-
-    if (first != null) {
-      expected = Iterables.filter(expected, v -> expectedOrder.compare(first, 
v) <= 0);
-    }
-
-    if (last != null) {
-      expected = Iterables.filter(expected, v -> expectedOrder.compare(v, 
last) <= 0);
-    }
-
-    if (params.skip > 0) {
-      expected = Iterables.skip(expected, (int) params.skip);
-    }
-
-    if (params.max != Long.MAX_VALUE) {
-      expected = Iterables.limit(expected, (int) params.max);
-    }
-
-    List<CustomType1> actual = collect(params);
-    compareLists(expected, actual);
-  }
-
-  /** Could use assertEquals(), but that creates hard to read errors for large 
lists. */
-  private void compareLists(Iterable<?> expected, List<?> actual) {
-    Iterator<?> expectedIt = expected.iterator();
-    Iterator<?> actualIt = actual.iterator();
-
-    int count = 0;
-    while (expectedIt.hasNext()) {
-      if (!actualIt.hasNext()) {
-        break;
-      }
-      count++;
-      assertEquals(expectedIt.next(), actualIt.next());
-    }
-
-    String message;
-    Object[] remaining;
-    int expectedCount = count;
-    int actualCount = count;
-
-    if (expectedIt.hasNext()) {
-      remaining = Iterators.toArray(expectedIt, Object.class);
-      expectedCount += remaining.length;
-      message = "missing";
-    } else {
-      remaining = Iterators.toArray(actualIt, Object.class);
-      actualCount += remaining.length;
-      message = "stray";
-    }
-
-    assertEquals(String.format("Found %s elements: %s", message, 
Arrays.asList(remaining)),
-      expectedCount, actualCount);
-  }
-
-  private KVStoreView<CustomType1> view() throws Exception {
-    return db.view(CustomType1.class);
-  }
-
-  private List<CustomType1> collect(KVStoreView<CustomType1> view) throws 
Exception {
-    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
-  }
-
-  private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
-    List<CustomType1> copy = new ArrayList<>(allEntries);
-    Collections.sort(copy, comp);
-    return copy;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
deleted file mode 100644
index 57ee4f6..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryIteratorSuite.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-public class InMemoryIteratorSuite extends DBIteratorSuite {
-
-  @Override
-  protected KVStore createStore() {
-    return new InMemoryStore();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
deleted file mode 100644
index 6a7915f..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/InMemoryStoreSuite.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.util.NoSuchElementException;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class InMemoryStoreSuite {
-
-  @Test
-  public void testObjectWriteReadDelete() throws Exception {
-    KVStore store = new InMemoryStore();
-
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-
-    try {
-      store.read(CustomType1.class, t.key);
-      fail("Expected exception for non-existant object.");
-    } catch (NoSuchElementException nsee) {
-      // Expected.
-    }
-
-    store.write(t);
-    assertEquals(t, store.read(t.getClass(), t.key));
-    assertEquals(1L, store.count(t.getClass()));
-
-    store.delete(t.getClass(), t.key);
-    try {
-      store.read(t.getClass(), t.key);
-      fail("Expected exception for deleted object.");
-    } catch (NoSuchElementException nsee) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testMultipleObjectWriteReadDelete() throws Exception {
-    KVStore store = new InMemoryStore();
-
-    CustomType1 t1 = new CustomType1();
-    t1.key = "key1";
-    t1.id = "id";
-    t1.name = "name1";
-
-    CustomType1 t2 = new CustomType1();
-    t2.key = "key2";
-    t2.id = "id";
-    t2.name = "name2";
-
-    store.write(t1);
-    store.write(t2);
-
-    assertEquals(t1, store.read(t1.getClass(), t1.key));
-    assertEquals(t2, store.read(t2.getClass(), t2.key));
-    assertEquals(2L, store.count(t1.getClass()));
-
-    store.delete(t1.getClass(), t1.key);
-    assertEquals(t2, store.read(t2.getClass(), t2.key));
-    store.delete(t2.getClass(), t2.key);
-    try {
-      store.read(t2.getClass(), t2.key);
-      fail("Expected exception for deleted object.");
-    } catch (NoSuchElementException nsee) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testMetadata() throws Exception {
-    KVStore store = new InMemoryStore();
-    assertNull(store.getMetadata(CustomType1.class));
-
-    CustomType1 t = new CustomType1();
-    t.id = "id";
-    t.name = "name";
-
-    store.setMetadata(t);
-    assertEquals(t, store.getMetadata(CustomType1.class));
-
-    store.setMetadata(null);
-    assertNull(store.getMetadata(CustomType1.class));
-  }
-
-  @Test
-  public void testUpdate() throws Exception {
-    KVStore store = new InMemoryStore();
-
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-
-    store.write(t);
-
-    t.name = "anotherName";
-
-    store.write(t);
-    assertEquals(1, store.count(t.getClass()));
-    assertSame(t, store.read(t.getClass(), t.key));
-  }
-
-  @Test
-  public void testArrayIndices() throws Exception {
-    KVStore store = new InMemoryStore();
-
-    ArrayKeyIndexType o = new ArrayKeyIndexType();
-    o.key = new int[] { 1, 2 };
-    o.id = new String[] { "3", "4" };
-
-    store.write(o);
-    assertEquals(o, store.read(ArrayKeyIndexType.class, o.key));
-    assertEquals(o, 
store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
-  }
-
-  @Test
-  public void testBasicIteration() throws Exception {
-    KVStore store = new InMemoryStore();
-
-    CustomType1 t1 = new CustomType1();
-    t1.key = "1";
-    t1.id = "id1";
-    t1.name = "name1";
-    store.write(t1);
-
-    CustomType1 t2 = new CustomType1();
-    t2.key = "2";
-    t2.id = "id2";
-    t2.name = "name2";
-    store.write(t2);
-
-    assertEquals(t1.id, store.view(t1.getClass()).iterator().next().id);
-    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).iterator().next().id);
-    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).max(1).iterator().next().id);
-    assertEquals(t1.id,
-      store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id);
-    assertEquals(t2.id,
-      store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id);
-    
assertFalse(store.view(t1.getClass()).first(t2.id).skip(1).iterator().hasNext());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
deleted file mode 100644
index 5e33606..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBBenchmark.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
-/**
- * A set of small benchmarks for the LevelDB implementation.
- *
- * The benchmarks are run over two different types (one with just a natural 
index, and one
- * with a ref index), over a set of 2^20 elements, and the following tests are 
performed:
- *
- * - write (then update) elements in sequential natural key order
- * - write (then update) elements in random natural key order
- * - iterate over natural index, ascending and descending
- * - iterate over ref index, ascending and descending
- */
-@Ignore
-public class LevelDBBenchmark {
-
-  private static final int COUNT = 1024;
-  private static final AtomicInteger IDGEN = new AtomicInteger();
-  private static final MetricRegistry metrics = new MetricRegistry();
-  private static final Timer dbCreation = metrics.timer("dbCreation");
-  private static final Timer dbClose = metrics.timer("dbClose");
-
-  private LevelDB db;
-  private File dbpath;
-
-  @Before
-  public void setup() throws Exception {
-    dbpath = File.createTempFile("test.", ".ldb");
-    dbpath.delete();
-    try(Timer.Context ctx = dbCreation.time()) {
-      db = new LevelDB(dbpath);
-    }
-  }
-
-  @After
-  public void cleanup() throws Exception {
-    if (db != null) {
-      try(Timer.Context ctx = dbClose.time()) {
-        db.close();
-      }
-    }
-    if (dbpath != null) {
-      FileUtils.deleteQuietly(dbpath);
-    }
-  }
-
-  @AfterClass
-  public static void report() {
-    if (metrics.getTimers().isEmpty()) {
-      return;
-    }
-
-    int headingPrefix = 0;
-    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
-      headingPrefix = Math.max(e.getKey().length(), headingPrefix);
-    }
-    headingPrefix += 4;
-
-    StringBuilder heading = new StringBuilder();
-    for (int i = 0; i < headingPrefix; i++) {
-      heading.append(" ");
-    }
-    heading.append("\tcount");
-    heading.append("\tmean");
-    heading.append("\tmin");
-    heading.append("\tmax");
-    heading.append("\t95th");
-    System.out.println(heading);
-
-    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
-      StringBuilder row = new StringBuilder();
-      row.append(e.getKey());
-      for (int i = 0; i < headingPrefix - e.getKey().length(); i++) {
-        row.append(" ");
-      }
-
-      Snapshot s = e.getValue().getSnapshot();
-      row.append("\t").append(e.getValue().getCount());
-      row.append("\t").append(toMs(s.getMean()));
-      row.append("\t").append(toMs(s.getMin()));
-      row.append("\t").append(toMs(s.getMax()));
-      row.append("\t").append(toMs(s.get95thPercentile()));
-
-      System.out.println(row);
-    }
-
-    
Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class))
-      .build().report();
-  }
-
-  private static String toMs(double nanos) {
-    return String.format("%.3f", nanos / 1000 / 1000);
-  }
-
-  @Test
-  public void sequentialWritesNoIndex() throws Exception {
-    List<SimpleType> entries = createSimpleType();
-    writeAll(entries, "sequentialWritesNoIndex");
-    writeAll(entries, "sequentialUpdatesNoIndex");
-    deleteNoIndex(entries, "sequentialDeleteNoIndex");
-  }
-
-  @Test
-  public void randomWritesNoIndex() throws Exception {
-    List<SimpleType> entries = createSimpleType();
-
-    Collections.shuffle(entries);
-    writeAll(entries, "randomWritesNoIndex");
-
-    Collections.shuffle(entries);
-    writeAll(entries, "randomUpdatesNoIndex");
-
-    Collections.shuffle(entries);
-    deleteNoIndex(entries, "randomDeletesNoIndex");
-  }
-
-  @Test
-  public void sequentialWritesIndexedType() throws Exception {
-    List<IndexedType> entries = createIndexedType();
-    writeAll(entries, "sequentialWritesIndexed");
-    writeAll(entries, "sequentialUpdatesIndexed");
-    deleteIndexed(entries, "sequentialDeleteIndexed");
-  }
-
-  @Test
-  public void randomWritesIndexedTypeAndIteration() throws Exception {
-    List<IndexedType> entries = createIndexedType();
-
-    Collections.shuffle(entries);
-    writeAll(entries, "randomWritesIndexed");
-
-    Collections.shuffle(entries);
-    writeAll(entries, "randomUpdatesIndexed");
-
-    // Run iteration benchmarks here since we've gone through the trouble of 
writing all
-    // the data already.
-    KVStoreView<?> view = db.view(IndexedType.class);
-    iterate(view, "naturalIndex");
-    iterate(view.reverse(), "naturalIndexDescending");
-    iterate(view.index("name"), "refIndex");
-    iterate(view.index("name").reverse(), "refIndexDescending");
-
-    Collections.shuffle(entries);
-    deleteIndexed(entries, "randomDeleteIndexed");
-  }
-
-  private void iterate(KVStoreView<?> view, String name) throws Exception {
-    Timer create = metrics.timer(name + "CreateIterator");
-    Timer iter = metrics.timer(name + "Iteration");
-    KVStoreIterator<?> it = null;
-    {
-      // Create the iterator several times, just to have multiple data points.
-      for (int i = 0; i < 1024; i++) {
-        if (it != null) {
-          it.close();
-        }
-        try(Timer.Context ctx = create.time()) {
-          it = view.closeableIterator();
-        }
-      }
-    }
-
-    for (; it.hasNext(); ) {
-      try(Timer.Context ctx = iter.time()) {
-        it.next();
-      }
-    }
-  }
-
-  private void writeAll(List<?> entries, String timerName) throws Exception {
-    Timer timer = newTimer(timerName);
-    for (Object o : entries) {
-      try(Timer.Context ctx = timer.time()) {
-        db.write(o);
-      }
-    }
-  }
-
-  private void deleteNoIndex(List<SimpleType> entries, String timerName) 
throws Exception {
-    Timer delete = newTimer(timerName);
-    for (SimpleType i : entries) {
-      try(Timer.Context ctx = delete.time()) {
-        db.delete(i.getClass(), i.key);
-      }
-    }
-  }
-
-  private void deleteIndexed(List<IndexedType> entries, String timerName) 
throws Exception {
-    Timer delete = newTimer(timerName);
-    for (IndexedType i : entries) {
-      try(Timer.Context ctx = delete.time()) {
-        db.delete(i.getClass(), i.key);
-      }
-    }
-  }
-
-  private List<SimpleType> createSimpleType() {
-    List<SimpleType> entries = new ArrayList<>();
-    for (int i = 0; i < COUNT; i++) {
-      SimpleType t = new SimpleType();
-      t.key = IDGEN.getAndIncrement();
-      t.name = "name" + (t.key % 1024);
-      entries.add(t);
-    }
-    return entries;
-  }
-
-  private List<IndexedType> createIndexedType() {
-    List<IndexedType> entries = new ArrayList<>();
-    for (int i = 0; i < COUNT; i++) {
-      IndexedType t = new IndexedType();
-      t.key = IDGEN.getAndIncrement();
-      t.name = "name" + (t.key % 1024);
-      entries.add(t);
-    }
-    return entries;
-  }
-
-  private Timer newTimer(String name) {
-    assertNull("Timer already exists: " + name, metrics.getTimers().get(name));
-    return metrics.timer(name);
-  }
-
-  public static class SimpleType {
-
-    @KVIndex
-    public int key;
-
-    public String name;
-
-  }
-
-  public static class IndexedType {
-
-    @KVIndex
-    public int key;
-
-    @KVIndex("name")
-    public String name;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
deleted file mode 100644
index 9340971..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBIteratorSuite.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.File;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-
-public class LevelDBIteratorSuite extends DBIteratorSuite {
-
-  private static File dbpath;
-  private static LevelDB db;
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    if (db != null) {
-      db.close();
-    }
-    if (dbpath != null) {
-      FileUtils.deleteQuietly(dbpath);
-    }
-  }
-
-  @Override
-  protected KVStore createStore() throws Exception {
-    dbpath = File.createTempFile("test.", ".ldb");
-    dbpath.delete();
-    db = new LevelDB(dbpath);
-    return db;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
deleted file mode 100644
index 86c85c1..0000000
--- a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.io.FileUtils;
-import org.iq80.leveldb.DBIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class LevelDBSuite {
-
-  private LevelDB db;
-  private File dbpath;
-
-  @After
-  public void cleanup() throws Exception {
-    if (db != null) {
-      db.close();
-    }
-    if (dbpath != null) {
-      FileUtils.deleteQuietly(dbpath);
-    }
-  }
-
-  @Before
-  public void setup() throws Exception {
-    dbpath = File.createTempFile("test.", ".ldb");
-    dbpath.delete();
-    db = new LevelDB(dbpath);
-  }
-
-  @Test
-  public void testReopenAndVersionCheckDb() throws Exception {
-    db.close();
-    db = null;
-    assertTrue(dbpath.exists());
-
-    db = new LevelDB(dbpath);
-    assertEquals(LevelDB.STORE_VERSION,
-      db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
-    db.db().put(LevelDB.STORE_VERSION_KEY, 
db.serializer.serialize(LevelDB.STORE_VERSION + 1));
-    db.close();
-    db = null;
-
-    try {
-      db = new LevelDB(dbpath);
-      fail("Should have failed version check.");
-    } catch (UnsupportedStoreVersionException e) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testObjectWriteReadDelete() throws Exception {
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
-
-    try {
-      db.read(CustomType1.class, t.key);
-      fail("Expected exception for non-existant object.");
-    } catch (NoSuchElementException nsee) {
-      // Expected.
-    }
-
-    db.write(t);
-    assertEquals(t, db.read(t.getClass(), t.key));
-    assertEquals(1L, db.count(t.getClass()));
-
-    db.delete(t.getClass(), t.key);
-    try {
-      db.read(t.getClass(), t.key);
-      fail("Expected exception for deleted object.");
-    } catch (NoSuchElementException nsee) {
-      // Expected.
-    }
-
-    // Look into the actual DB and make sure that all the keys related to the 
type have been
-    // removed.
-    assertEquals(0, countKeys(t.getClass()));
-  }
-
-  @Test
-  public void testMultipleObjectWriteReadDelete() throws Exception {
-    CustomType1 t1 = new CustomType1();
-    t1.key = "key1";
-    t1.id = "id";
-    t1.name = "name1";
-    t1.child = "child1";
-
-    CustomType1 t2 = new CustomType1();
-    t2.key = "key2";
-    t2.id = "id";
-    t2.name = "name2";
-    t2.child = "child2";
-
-    db.write(t1);
-    db.write(t2);
-
-    assertEquals(t1, db.read(t1.getClass(), t1.key));
-    assertEquals(t2, db.read(t2.getClass(), t2.key));
-    assertEquals(2L, db.count(t1.getClass()));
-
-    // There should be one "id" index entry with two values.
-    assertEquals(2, db.count(t1.getClass(), "id", t1.id));
-
-    // Delete the first entry; now there should be 3 remaining keys, since one 
of the "name"
-    // index entries should have been removed.
-    db.delete(t1.getClass(), t1.key);
-
-    // Make sure there's a single entry in the "id" index now.
-    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
-
-    // Delete the remaining entry, make sure all data is gone.
-    db.delete(t2.getClass(), t2.key);
-    assertEquals(0, countKeys(t2.getClass()));
-  }
-
-  @Test
-  public void testMultipleTypesWriteReadDelete() throws Exception {
-    CustomType1 t1 = new CustomType1();
-    t1.key = "1";
-    t1.id = "id";
-    t1.name = "name1";
-    t1.child = "child1";
-
-    IntKeyType t2 = new IntKeyType();
-    t2.key = 2;
-    t2.id = "2";
-    t2.values = Arrays.asList("value1", "value2");
-
-    ArrayKeyIndexType t3 = new ArrayKeyIndexType();
-    t3.key = new int[] { 42, 84 };
-    t3.id = new String[] { "id1", "id2" };
-
-    db.write(t1);
-    db.write(t2);
-    db.write(t3);
-
-    assertEquals(t1, db.read(t1.getClass(), t1.key));
-    assertEquals(t2, db.read(t2.getClass(), t2.key));
-    assertEquals(t3, db.read(t3.getClass(), t3.key));
-
-    // There should be one "id" index with a single entry for each type.
-    assertEquals(1, db.count(t1.getClass(), "id", t1.id));
-    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
-    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
-
-    // Delete the first entry; this should not affect the entries for the 
second type.
-    db.delete(t1.getClass(), t1.key);
-    assertEquals(0, countKeys(t1.getClass()));
-    assertEquals(1, db.count(t2.getClass(), "id", t2.id));
-    assertEquals(1, db.count(t3.getClass(), "id", t3.id));
-
-    // Delete the remaining entries, make sure all data is gone.
-    db.delete(t2.getClass(), t2.key);
-    assertEquals(0, countKeys(t2.getClass()));
-
-    db.delete(t3.getClass(), t3.key);
-    assertEquals(0, countKeys(t3.getClass()));
-  }
-
-  @Test
-  public void testMetadata() throws Exception {
-    assertNull(db.getMetadata(CustomType1.class));
-
-    CustomType1 t = new CustomType1();
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
-
-    db.setMetadata(t);
-    assertEquals(t, db.getMetadata(CustomType1.class));
-
-    db.setMetadata(null);
-    assertNull(db.getMetadata(CustomType1.class));
-  }
-
-  @Test
-  public void testUpdate() throws Exception {
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
-
-    db.write(t);
-
-    t.name = "anotherName";
-
-    db.write(t);
-
-    assertEquals(1, db.count(t.getClass()));
-    assertEquals(1, db.count(t.getClass(), "name", "anotherName"));
-    assertEquals(0, db.count(t.getClass(), "name", "name"));
-  }
-
-  @Test
-  public void testSkip() throws Exception {
-    for (int i = 0; i < 10; i++) {
-      CustomType1 t = new CustomType1();
-      t.key = "key" + i;
-      t.id = "id" + i;
-      t.name = "name" + i;
-      t.child = "child" + i;
-
-      db.write(t);
-    }
-
-    KVStoreIterator<CustomType1> it = 
db.view(CustomType1.class).closeableIterator();
-    assertTrue(it.hasNext());
-    assertTrue(it.skip(5));
-    assertEquals("key5", it.next().key);
-    assertTrue(it.skip(3));
-    assertEquals("key9", it.next().key);
-    assertFalse(it.hasNext());
-  }
-
-  private int countKeys(Class<?> type) throws Exception {
-    byte[] prefix = db.getTypeInfo(type).keyPrefix();
-    int count = 0;
-
-    DBIterator it = db.db().iterator();
-    it.seek(prefix);
-
-    while (it.hasNext()) {
-      byte[] key = it.next().getKey();
-      if (LevelDBIterator.startsWith(key, prefix)) {
-        count++;
-      }
-    }
-
-    return count;
-  }
-
-  public static class IntKeyType {
-
-    @KVIndex
-    public int key;
-
-    @KVIndex("id")
-    public String id;
-
-    public List<String> values;
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof IntKeyType) {
-        IntKeyType other = (IntKeyType) o;
-        return key == other.key && id.equals(other.id) && 
values.equals(other.values);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return id.hashCode();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
deleted file mode 100644
index 8e61965..0000000
--- 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.kvstore;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class LevelDBTypeInfoSuite {
-
-  @Test
-  public void testIndexAnnotation() throws Exception {
-    KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
-    assertEquals(5, ti.indices().count());
-
-    CustomType1 t1 = new CustomType1();
-    t1.key = "key";
-    t1.id = "id";
-    t1.name = "name";
-    t1.num = 42;
-    t1.child = "child";
-
-    assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
-    assertEquals(t1.id, ti.getIndexValue("id", t1));
-    assertEquals(t1.name, ti.getIndexValue("name", t1));
-    assertEquals(t1.num, ti.getIndexValue("int", t1));
-    assertEquals(t1.child, ti.getIndexValue("child", t1));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNoNaturalIndex() throws Exception {
-    newTypeInfo(NoNaturalIndex.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNoNaturalIndex2() throws Exception {
-    newTypeInfo(NoNaturalIndex2.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testDuplicateIndex() throws Exception {
-    newTypeInfo(DuplicateIndex.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testEmptyIndexName() throws Exception {
-    newTypeInfo(EmptyIndexName.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testIllegalIndexName() throws Exception {
-    newTypeInfo(IllegalIndexName.class);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testIllegalIndexMethod() throws Exception {
-    newTypeInfo(IllegalIndexMethod.class);
-  }
-
-  @Test
-  public void testKeyClashes() throws Exception {
-    LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
-
-    CustomType1 t1 = new CustomType1();
-    t1.key = "key1";
-    t1.name = "a";
-
-    CustomType1 t2 = new CustomType1();
-    t2.key = "key2";
-    t2.name = "aa";
-
-    CustomType1 t3 = new CustomType1();
-    t3.key = "key3";
-    t3.name = "aaa";
-
-    // Make sure entries with conflicting names are sorted correctly.
-    assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t2));
-    assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t3));
-    assertBefore(ti.index("name").entityKey(null, t2), 
ti.index("name").entityKey(null, t3));
-  }
-
-  @Test
-  public void testNumEncoding() throws Exception {
-    LevelDBTypeInfo.Index idx = 
newTypeInfo(CustomType1.class).indices().iterator().next();
-
-    assertEquals("+=00000001", new String(idx.toKey(1), UTF_8));
-    assertEquals("+=00000010", new String(idx.toKey(16), UTF_8));
-    assertEquals("+=7fffffff", new String(idx.toKey(Integer.MAX_VALUE), 
UTF_8));
-
-    assertBefore(idx.toKey(1), idx.toKey(2));
-    assertBefore(idx.toKey(-1), idx.toKey(2));
-    assertBefore(idx.toKey(-11), idx.toKey(2));
-    assertBefore(idx.toKey(-11), idx.toKey(-1));
-    assertBefore(idx.toKey(1), idx.toKey(11));
-    assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE));
-
-    assertBefore(idx.toKey(1L), idx.toKey(2L));
-    assertBefore(idx.toKey(-1L), idx.toKey(2L));
-    assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
-
-    assertBefore(idx.toKey((short) 1), idx.toKey((short) 2));
-    assertBefore(idx.toKey((short) -1), idx.toKey((short) 2));
-    assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE));
-
-    assertBefore(idx.toKey((byte) 1), idx.toKey((byte) 2));
-    assertBefore(idx.toKey((byte) -1), idx.toKey((byte) 2));
-    assertBefore(idx.toKey(Byte.MIN_VALUE), idx.toKey(Byte.MAX_VALUE));
-
-    byte prefix = LevelDBTypeInfo.ENTRY_PREFIX;
-    assertSame(new byte[] { prefix, LevelDBTypeInfo.FALSE }, idx.toKey(false));
-    assertSame(new byte[] { prefix, LevelDBTypeInfo.TRUE }, idx.toKey(true));
-  }
-
-  @Test
-  public void testArrayIndices() throws Exception {
-    LevelDBTypeInfo.Index idx = 
newTypeInfo(CustomType1.class).indices().iterator().next();
-
-    assertBefore(idx.toKey(new String[] { "str1" }), idx.toKey(new String[] { 
"str2" }));
-    assertBefore(idx.toKey(new String[] { "str1", "str2" }),
-      idx.toKey(new String[] { "str1", "str3" }));
-
-    assertBefore(idx.toKey(new int[] { 1 }), idx.toKey(new int[] { 2 }));
-    assertBefore(idx.toKey(new int[] { 1, 2 }), idx.toKey(new int[] { 1, 3 }));
-  }
-
-  private LevelDBTypeInfo newTypeInfo(Class<?> type) throws Exception {
-    return new LevelDBTypeInfo(null, type, type.getName().getBytes(UTF_8));
-  }
-
-  private void assertBefore(byte[] key1, byte[] key2) {
-    assertBefore(new String(key1, UTF_8), new String(key2, UTF_8));
-  }
-
-  private void assertBefore(String str1, String str2) {
-    assertTrue(String.format("%s < %s failed", str1, str2), 
str1.compareTo(str2) < 0);
-  }
-
-  private void assertSame(byte[] key1, byte[] key2) {
-    assertEquals(new String(key1, UTF_8), new String(key2, UTF_8));
-  }
-
-  public static class NoNaturalIndex {
-
-    public String id;
-
-  }
-
-  public static class NoNaturalIndex2 {
-
-    @KVIndex("id")
-    public String id;
-
-  }
-
-  public static class DuplicateIndex {
-
-    @KVIndex
-    public String key;
-
-    @KVIndex("id")
-    public String id;
-
-    @KVIndex("id")
-    public String id2;
-
-  }
-
-  public static class EmptyIndexName {
-
-    @KVIndex("")
-    public String id;
-
-  }
-
-  public static class IllegalIndexName {
-
-    @KVIndex("__invalid")
-    public String id;
-
-  }
-
-  public static class IllegalIndexMethod {
-
-    @KVIndex("id")
-    public String id(boolean illegalParam) {
-      return null;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java
new file mode 100644
index 0000000..32030fb
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.Arrays;
+
+public class ArrayKeyIndexType {
+
+  @KVIndex
+  public int[] key;
+
+  @KVIndex("id")
+  public String[] id;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ArrayKeyIndexType) {
+      ArrayKeyIndexType other = (ArrayKeyIndexType) o;
+      return Arrays.equals(key, other.key) && Arrays.equals(id, other.id);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return key.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayWrappersSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayWrappersSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayWrappersSuite.java
new file mode 100644
index 0000000..b1c8927
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayWrappersSuite.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class ArrayWrappersSuite {
+
+  @Test
+  public void testGenericArrayKey() {
+   byte[] b1 = new byte[] { 0x01, 0x02, 0x03 };
+   byte[] b2 = new byte[] { 0x01, 0x02 };
+   int[] i1 = new int[] { 1, 2, 3 };
+   int[] i2 = new int[] { 1, 2 };
+   String[] s1 = new String[] { "1", "2", "3" };
+   String[] s2 = new String[] { "1", "2" };
+
+   assertEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(b2));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(b1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(i2));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(i1), ArrayWrappers.forArray(s1));
+
+   assertEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(s2));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(b1));
+   assertNotEquals(ArrayWrappers.forArray(s1), ArrayWrappers.forArray(i1));
+
+   assertEquals(0, 
ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b1)));
+   assertTrue(ArrayWrappers.forArray(b1).compareTo(ArrayWrappers.forArray(b2)) 
> 0);
+
+   assertEquals(0, 
ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i1)));
+   assertTrue(ArrayWrappers.forArray(i1).compareTo(ArrayWrappers.forArray(i2)) 
> 0);
+
+   assertEquals(0, 
ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s1)));
+   assertTrue(ArrayWrappers.forArray(s1).compareTo(ArrayWrappers.forArray(s2)) 
> 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java
new file mode 100644
index 0000000..92b643b
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/CustomType1.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.base.Objects;
+
+public class CustomType1 {
+
+  @KVIndex
+  public String key;
+
+  @KVIndex("id")
+  public String id;
+
+  @KVIndex(value = "name", copy = true)
+  public String name;
+
+  @KVIndex("int")
+  public int num;
+
+  @KVIndex(value = "child", parent = "id")
+  public String child;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof CustomType1) {
+      CustomType1 other = (CustomType1) o;
+      return id.equals(other.id) && name.equals(other.name);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("key", key)
+      .add("id", id)
+      .add("name", name)
+      .add("num", num)
+      .toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
new file mode 100644
index 0000000..9a81f86
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DBIteratorSuite.class);
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List<CustomType1> allEntries;
+  private static List<CustomType1> clashingEntries;
+  private static KVStore db;
+
+  private interface BaseComparator extends Comparator<CustomType1> {
+    /**
+     * Returns a comparator that falls back to natural order if this 
comparator's ordering
+     * returns equality for two elements. Used to mimic how the index sorts 
things internally.
+     */
+    default BaseComparator fallback() {
+      return (t1, t2) -> {
+        int result = BaseComparator.this.compare(t1, t2);
+        if (result != 0) {
+          return result;
+        }
+
+        return t1.key.compareTo(t2.key);
+      };
+    }
+
+    /** Reverses the order of this comparator. */
+    default BaseComparator reverse() {
+      return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+    }
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num 
- t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned up in 
a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @BeforeClass
+  public static void setupClass() {
+    long seed = RND.nextLong();
+    LOG.info("Random seed: {}", seed);
+    RND.setSeed(seed);
+  }
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+    allEntries = null;
+    db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    if (db != null) {
+      return;
+    }
+
+    db = createStore();
+
+    int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+    allEntries = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + RND.nextInt(MAX_ENTRIES);
+      t.num = RND.nextInt(MAX_ENTRIES);
+      t.child = "child" + (i % MIN_ENTRIES);
+      allEntries.add(t);
+    }
+
+    // Shuffle the entries to avoid the insertion order matching the natural 
ordering. Just in case.
+    Collections.shuffle(allEntries, RND);
+    for (CustomType1 e : allEntries) {
+      db.write(e);
+    }
+
+    // Pick the first generated value, and forcefully create a few entries 
that will clash
+    // with the indexed values (id and name), to make sure the index behaves 
correctly when
+    // multiple entities are indexed by the same value.
+    //
+    // This also serves as a test for the test code itself, to make sure it's 
sorting indices
+    // the same way the store is expected to.
+    CustomType1 first = allEntries.get(0);
+    clashingEntries = new ArrayList<>();
+
+    int clashCount = RND.nextInt(MIN_ENTRIES) + 1;
+    for (int i = 0; i < clashCount; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "n-key" + (count + i);
+      t.id = first.id;
+      t.name = first.name;
+      t.num = first.num;
+      t.child = first.child;
+      allEntries.add(t);
+      clashingEntries.add(t);
+      db.write(t);
+    }
+
+    // Create another entry that could cause problems: take the first entry, 
and make its indexed
+    // name be an extension of the existing ones, to make sure the 
implementation sorts these
+    // correctly even considering the separator character (shorter strings 
first).
+    CustomType1 t = new CustomType1();
+    t.key = "extended-key-0";
+    t.id = first.id;
+    t.name = first.name + "a";
+    t.num = first.num;
+    t.child = first.child;
+    allEntries.add(t);
+    db.write(t);
+  }
+
+  @Test
+  public void naturalIndex() throws Exception {
+    testIteration(NATURAL_ORDER, view(), null, null);
+  }
+
+  @Test
+  public void refIndex() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
+  }
+
+  @Test
+  public void copyIndex() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
+  }
+
+  @Test
+  public void numericIndex() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
+  }
+
+  @Test
+  public void childIndex() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id), 
null, null);
+  }
+
+  @Test
+  public void naturalIndexDescending() throws Exception {
+    testIteration(NATURAL_ORDER, view().reverse(), null, null);
+  }
+
+  @Test
+  public void refIndexDescending() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
+  }
+
+  @Test
+  public void copyIndexDescending() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, 
null);
+  }
+
+  @Test
+  public void numericIndexDescending() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, 
null);
+  }
+
+  @Test
+  public void childIndexDescending() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, 
null);
+  }
+
+  @Test
+  public void copyIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), 
first, null);
+  }
+
+  @Test
+  public void numericIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), 
first, null);
+  }
+
+  @Test
+  public void childIndexWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).first(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, 
null);
+  }
+
+  @Test
+  public void refIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, 
view().reverse().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexDescendingWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER,
+      view().index("child").parent(any.id).first(any.child).reverse(), null, 
null);
+  }
+
+  @Test
+  public void naturalIndexWithSkip() throws Exception {
+    testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void refIndexWithSkip() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").skip(pickCount()), null, 
null);
+  }
+
+  @Test
+  public void copyIndexWithSkip() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").skip(pickCount()), 
null, null);
+  }
+
+  @Test
+  public void childIndexWithSkip() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).skip(pickCount()),
+      null, null);
+  }
+
+  @Test
+  public void naturalIndexWithMax() throws Exception {
+    testIteration(NATURAL_ORDER, view().max(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithMax() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").max(pickCount()), 
null, null);
+  }
+
+  @Test
+  public void childIndexWithMax() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).max(pickCount()), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, 
last);
+  }
+
+  @Test
+  public void copyIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), 
null, last);
+  }
+
+  @Test
+  public void numericIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), 
null, last);
+  }
+
+  @Test
+  public void childIndexWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), 
null, last);
+  }
+
+  @Test
+  public void copyIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").last(last.name),
+      null, last);
+  }
+
+  @Test
+  public void numericIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").last(last.num),
+      null, last);
+   }
+
+  @Test
+  public void childIndexDescendingWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child).reverse(),
+      null, null);
+  }
+
+  @Test
+  public void testRefWithIntNaturalKey() throws Exception {
+    LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
+    i.key = 1;
+    i.id = "1";
+    i.values = Arrays.asList("1");
+
+    db.write(i);
+
+    try(KVStoreIterator<?> it = db.view(i.getClass()).closeableIterator()) {
+      Object read = it.next();
+      assertEquals(i, read);
+    }
+  }
+
+  private CustomType1 pickLimit() {
+    // Picks an element that has clashes with other elements in the given 
index.
+    return clashingEntries.get(RND.nextInt(clashingEntries.size()));
+  }
+
+  private int pickCount() {
+    int count = RND.nextInt(allEntries.size() / 2);
+    return Math.max(count, 1);
+  }
+
+  /**
+   * Compares the two values and falls back to comparing the natural key of 
CustomType1
+   * if they're the same, to mimic the behavior of the indexing code.
+   */
+  private <T extends Comparable<T>> int compareWithFallback(
+      T v1,
+      T v2,
+      CustomType1 ct1,
+      CustomType1 ct2) {
+    int result = v1.compareTo(v2);
+    if (result != 0) {
+      return result;
+    }
+
+    return ct1.key.compareTo(ct2.key);
+  }
+
+  private void testIteration(
+      final BaseComparator order,
+      final KVStoreView<CustomType1> params,
+      final CustomType1 first,
+      final CustomType1 last) throws Exception {
+    List<CustomType1> indexOrder = sortBy(order.fallback());
+    if (!params.ascending) {
+      indexOrder = Lists.reverse(indexOrder);
+    }
+
+    Iterable<CustomType1> expected = indexOrder;
+    BaseComparator expectedOrder = params.ascending ? order : order.reverse();
+
+    if (params.parent != null) {
+      expected = Iterables.filter(expected, v -> params.parent.equals(v.id));
+    }
+
+    if (first != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(first, 
v) <= 0);
+    }
+
+    if (last != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(v, 
last) <= 0);
+    }
+
+    if (params.skip > 0) {
+      expected = Iterables.skip(expected, (int) params.skip);
+    }
+
+    if (params.max != Long.MAX_VALUE) {
+      expected = Iterables.limit(expected, (int) params.max);
+    }
+
+    List<CustomType1> actual = collect(params);
+    compareLists(expected, actual);
+  }
+
+  /** Could use assertEquals(), but that creates hard to read errors for large 
lists. */
+  private void compareLists(Iterable<?> expected, List<?> actual) {
+    Iterator<?> expectedIt = expected.iterator();
+    Iterator<?> actualIt = actual.iterator();
+
+    int count = 0;
+    while (expectedIt.hasNext()) {
+      if (!actualIt.hasNext()) {
+        break;
+      }
+      count++;
+      assertEquals(expectedIt.next(), actualIt.next());
+    }
+
+    String message;
+    Object[] remaining;
+    int expectedCount = count;
+    int actualCount = count;
+
+    if (expectedIt.hasNext()) {
+      remaining = Iterators.toArray(expectedIt, Object.class);
+      expectedCount += remaining.length;
+      message = "missing";
+    } else {
+      remaining = Iterators.toArray(actualIt, Object.class);
+      actualCount += remaining.length;
+      message = "stray";
+    }
+
+    assertEquals(String.format("Found %s elements: %s", message, 
Arrays.asList(remaining)),
+      expectedCount, actualCount);
+  }
+
+  private KVStoreView<CustomType1> view() throws Exception {
+    return db.view(CustomType1.class);
+  }
+
+  private List<CustomType1> collect(KVStoreView<CustomType1> view) throws 
Exception {
+    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
+  }
+
+  private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
+    List<CustomType1> copy = new ArrayList<>(allEntries);
+    Collections.sort(copy, comp);
+    return copy;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java
new file mode 100644
index 0000000..27dde6a
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+public class InMemoryIteratorSuite extends DBIteratorSuite {
+
+  @Override
+  protected KVStore createStore() {
+    return new InMemoryStore();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1bfb49/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
new file mode 100644
index 0000000..510b305
--- /dev/null
+++ 
b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.util.NoSuchElementException;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class InMemoryStoreSuite {
+
+  @Test
+  public void testObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    try {
+      store.read(CustomType1.class, t.key);
+      fail("Expected exception for non-existant object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    store.write(t);
+    assertEquals(t, store.read(t.getClass(), t.key));
+    assertEquals(1L, store.count(t.getClass()));
+
+    store.delete(t.getClass(), t.key);
+    try {
+      store.read(t.getClass(), t.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "key1";
+    t1.id = "id";
+    t1.name = "name1";
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "key2";
+    t2.id = "id";
+    t2.name = "name2";
+
+    store.write(t1);
+    store.write(t2);
+
+    assertEquals(t1, store.read(t1.getClass(), t1.key));
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    assertEquals(2L, store.count(t1.getClass()));
+
+    store.delete(t1.getClass(), t1.key);
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    store.delete(t2.getClass(), t2.key);
+    try {
+      store.read(t2.getClass(), t2.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testMetadata() throws Exception {
+    KVStore store = new InMemoryStore();
+    assertNull(store.getMetadata(CustomType1.class));
+
+    CustomType1 t = new CustomType1();
+    t.id = "id";
+    t.name = "name";
+
+    store.setMetadata(t);
+    assertEquals(t, store.getMetadata(CustomType1.class));
+
+    store.setMetadata(null);
+    assertNull(store.getMetadata(CustomType1.class));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t = new CustomType1();
+    t.key = "key";
+    t.id = "id";
+    t.name = "name";
+
+    store.write(t);
+
+    t.name = "anotherName";
+
+    store.write(t);
+    assertEquals(1, store.count(t.getClass()));
+    assertSame(t, store.read(t.getClass(), t.key));
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    ArrayKeyIndexType o = new ArrayKeyIndexType();
+    o.key = new int[] { 1, 2 };
+    o.id = new String[] { "3", "4" };
+
+    store.write(o);
+    assertEquals(o, store.read(ArrayKeyIndexType.class, o.key));
+    assertEquals(o, 
store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
+  }
+
+  @Test
+  public void testBasicIteration() throws Exception {
+    KVStore store = new InMemoryStore();
+
+    CustomType1 t1 = new CustomType1();
+    t1.key = "1";
+    t1.id = "id1";
+    t1.name = "name1";
+    store.write(t1);
+
+    CustomType1 t2 = new CustomType1();
+    t2.key = "2";
+    t2.id = "id2";
+    t2.name = "name2";
+    store.write(t2);
+
+    assertEquals(t1.id, store.view(t1.getClass()).iterator().next().id);
+    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).iterator().next().id);
+    assertEquals(t2.id, 
store.view(t1.getClass()).skip(1).max(1).iterator().next().id);
+    assertEquals(t1.id,
+      store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id);
+    assertEquals(t2.id,
+      store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id);
+    
assertFalse(store.view(t1.getClass()).first(t2.id).skip(1).iterator().hasNext());
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to