[SPARK-20641][CORE] Add key-value store abstraction and LevelDB implementation.

This change adds an abstraction and LevelDB implementation for a key-value
store that will be used to store UI and SHS data.

The interface is described in KVStore.java (see javadoc). Specifics
of the LevelDB implementation are discussed in the javadocs of both
LevelDB.java and LevelDBTypeInfo.java.

Included also are a few small benchmarks just to get some idea of
latency. Because they're too slow for regular unit test runs, they're
disabled by default.

Tested with the included unit tests, and also as part of the overall feature
implementation (including running SHS with hundreds of apps).

Author: Marcelo Vanzin <[email protected]>

Closes #17902 from vanzin/shs-ng/M1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cba4951
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cba4951
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cba4951

Branch: refs/heads/master
Commit: 0cba495120bc5a889ceeb8d66713a053d7561be2
Parents: b61a401
Author: Marcelo Vanzin <[email protected]>
Authored: Tue Jun 6 13:39:10 2017 -0500
Committer: Imran Rashid <[email protected]>
Committed: Tue Jun 6 13:39:10 2017 -0500

----------------------------------------------------------------------
 common/kvstore/pom.xml                          | 101 ++++
 .../java/org/apache/spark/kvstore/KVIndex.java  |  82 +++
 .../java/org/apache/spark/kvstore/KVStore.java  | 129 +++++
 .../apache/spark/kvstore/KVStoreIterator.java   |  47 ++
 .../apache/spark/kvstore/KVStoreSerializer.java |  86 ++++
 .../org/apache/spark/kvstore/KVStoreView.java   | 126 +++++
 .../org/apache/spark/kvstore/KVTypeInfo.java    | 156 ++++++
 .../java/org/apache/spark/kvstore/LevelDB.java  | 308 +++++++++++
 .../apache/spark/kvstore/LevelDBIterator.java   | 278 ++++++++++
 .../apache/spark/kvstore/LevelDBTypeInfo.java   | 516 +++++++++++++++++++
 .../UnsupportedStoreVersionException.java       |  27 +
 .../org/apache/spark/kvstore/CustomType1.java   |  63 +++
 .../apache/spark/kvstore/DBIteratorSuite.java   | 506 ++++++++++++++++++
 .../apache/spark/kvstore/LevelDBBenchmark.java  | 280 ++++++++++
 .../spark/kvstore/LevelDBIteratorSuite.java     |  48 ++
 .../org/apache/spark/kvstore/LevelDBSuite.java  | 312 +++++++++++
 .../spark/kvstore/LevelDBTypeInfoSuite.java     | 207 ++++++++
 .../kvstore/src/test/resources/log4j.properties |  27 +
 pom.xml                                         |  11 +
 project/SparkBuild.scala                        |   6 +-
 20 files changed, 3313 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/pom.xml
----------------------------------------------------------------------
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
new file mode 100644
index 0000000..d00cf27
--- /dev/null
+++ b/common/kvstore/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-kvstore_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Local DB</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>kvstore</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--
+      This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
+      them will yield errors.
+    -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
new file mode 100644
index 0000000..8b88990
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVIndex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Tags a field to be indexed when storing an object.
+ *
+ * <p>
+ * Types are required to have a natural index that uniquely identifies 
instances in the store.
+ * The default value of the annotation identifies the natural index for the 
type.
+ * </p>
+ *
+ * <p>
+ * Indexes allow for more efficient sorting of data read from the store. By 
annotating a field or
+ * "getter" method with this annotation, an index will be created that will 
provide sorting based on
+ * the string value of that field.
+ * </p>
+ *
+ * <p>
+ * Note that creating indices means more space will be needed, and maintenance 
operations like
+ * updating or deleting a value will become more expensive.
+ * </p>
+ *
+ * <p>
+ * Indices are restricted to String, integral types (byte, short, int, long, 
boolean), and arrays
+ * of those values.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+public @interface KVIndex {
+
+  public static final String NATURAL_INDEX_NAME = "__main__";
+
+  /**
+   * The name of the index to be created for the annotated entity. Must be 
unique within
+   * the class. Index names are not allowed to start with an underscore 
(that's reserved for
+   * internal use). The default value is the natural index name (which is 
always a copy index
+   * regardless of the annotation's values).
+   */
+  String value() default NATURAL_INDEX_NAME;
+
+  /**
+   * The name of the parent index of this index. By default there is no parent 
index, so the
+   * generated data can be retrieved without having to provide a parent value.
+   *
+   * <p>
+   * If a parent index is defined, iterating over the data using the index 
will require providing
+   * a single value for the parent index. This serves as a rudimentary way to 
provide relationships
+   * between entities in the store.
+   * </p>
+   */
+  String parent() default "";
+
+  /**
+   * Whether to copy the instance's data to the index, instead of just storing 
a pointer to the
+   * data. The default behavior is to just store a reference; that saves disk 
space but is slower
+   * to read, since there's a level of indirection.
+   */
+  boolean copy() default false;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
new file mode 100644
index 0000000..3be4b82
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * <p>
+ * There are two main features provided by the implementations of this 
interface:
+ * </p>
+ *
+ * <h3>Serialization</h3>
+ *
+ * <p>
+ * If the underlying data store requires serialization, data will be 
serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
+ * app-defined types.
+ * </p>
+ *
+ * <p>
+ * Data is also automatically compressed to save disk space.
+ * </p>
+ *
+ * <h3>Automatic Key Management</h3>
+ *
+ * <p>
+ * When using the built-in key management, the implementation will 
automatically create unique
+ * keys for each type written to the store. Keys are based on the type name, 
and always start
+ * with the "+" prefix character (so that it's easy to use both manual and 
automatic key
+ * management APIs without conflicts).
+ * </p>
+ *
+ * <p>
+ * Another feature of automatic key management is indexing; by annotating 
fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created to 
sort the data
+ * by the values of those properties. This makes it possible to provide 
sorting without having
+ * to load all instances of those types from the store.
+ * </p>
+ *
+ * <p>
+ * KVStore instances are thread-safe for both reads and writes.
+ * </p>
+ */
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not 
currently set.
+   *
+   * <p>
+   * The metadata type is application-specific. This is a convenience method 
so that applications
+   * don't need to define their own keys for this information.
+   * </p>
+   */
+  <T> T getMetadata(Class<T> klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   *
+   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not 
exist.
+   */
+  <T> T read(Class<T> klass, Object naturalKey) throws Exception;
+
+  /**
+   * Writes the given object to the store, including indexed fields. Indices 
are updated based
+   * on the annotated fields of the object's class.
+   *
+   * <p>
+   * Writes may be slower when the object already exists in the store, since 
it will involve
+   * updating existing indices.
+   * </p>
+   *
+   * @param value The object to write.
+   */
+  void write(Object value) throws Exception;
+
+  /**
+   * Removes an object and all data related to it, like index entries, from 
the store.
+   *
+   * @param type The object's type.
+   * @param naturalKey The object's "natural key", which uniquely identifies 
it. Null keys
+   *                   are not allowed.
+   * @throws NoSuchElementException If an element with the given key does not 
exist.
+   */
+  void delete(Class<?> type, Object naturalKey) throws Exception;
+
+  /**
+   * Returns a configurable view for iterating over entities of the given type.
+   */
+  <T> KVStoreView<T> view(Class<T> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type currently in the store.
+   */
+  long count(Class<?> type) throws Exception;
+
+  /**
+   * Returns the number of items of the given type which match the given 
indexed value.
+   */
+  long count(Class<?> type, String index, Object indexedValue) throws 
Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
new file mode 100644
index 0000000..3efdec9
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An iterator for KVStore.
+ *
+ * <p>
+ * Iterators may keep references to resources that need to be closed. It's 
recommended that users
+ * explicitly close iterators after they're used.
+ * </p>
+ */
+public interface KVStoreIterator<T> extends Iterator<T>, AutoCloseable {
+
+  /**
+   * Retrieve multiple elements from the store.
+   *
+   * @param max Maximum number of elements to retrieve.
+   */
+  List<T> next(int max);
+
+  /**
+   * Skip in the iterator.
+   *
+   * @return Whether there are items left after skipping.
+   */
+  boolean skip(long n);
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
new file mode 100644
index 0000000..b84ec91
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * <p>
+ * The serializer is based on Jackson, so values are written as JSON. It also 
allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * </p>
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+    this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+    if (o instanceof String) {
+      return ((String) o).getBytes(UTF_8);
+    } else {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      GZIPOutputStream out = new GZIPOutputStream(bytes);
+      try {
+        mapper.writeValue(out, o);
+      } finally {
+        out.close();
+      }
+      return bytes.toByteArray();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception 
{
+    if (klass.equals(String.class)) {
+      return (T) new String(data, UTF_8);
+    } else {
+      GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(data));
+      try {
+        return mapper.readValue(in, klass);
+      } finally {
+        in.close();
+      }
+    }
+  }
+
+  final byte[] serialize(long value) {
+    return String.valueOf(value).getBytes(UTF_8);
+  }
+
+  final long deserializeLong(byte[] data) {
+    return Long.parseLong(new String(data, UTF_8));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
new file mode 100644
index 0000000..b761640
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link KVStore}.
+ *
+ * <p>
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * </p>
+ *
+ * <p>
+ * The iterators returned by this view are of type {@link KVStoreIterator}; 
they auto-close
+ * when used in a for loop that exhausts their contents, but when used 
manually, they need
+ * to be closed explicitly unless all elements are read.
+ * </p>
+ */
+public abstract class KVStoreView<T> implements Iterable<T> {
+
+  final Class<T> type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class<T> type) {
+    this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending order.
+   */
+  public KVStoreView<T> reverse() {
+    ascending = !ascending;
+    return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView<T> index(String name) {
+    this.index = Preconditions.checkNotNull(name);
+    return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child index. 
Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * <p>
+   * Required for iterating over child indices, will generate an error if 
iterating over a
+   * parent-less index.
+   * </p>
+   */
+  public KVStoreView<T> parent(Object value) {
+    this.parent = value;
+    return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> first(Object value) {
+    this.first = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index (inclusive).
+   */
+  public KVStoreView<T> last(Object value) {
+    this.last = value;
+    return this;
+  }
+
+  /**
+   * Stops iteration after a number of elements has been retrieved.
+   */
+  public KVStoreView<T> max(long max) {
+    Preconditions.checkArgument(max > 0L, "max must be positive.");
+    this.max = max;
+    return this;
+  }
+
+  /**
+   * Skips a number of elements at the start of iteration. Skipped elements 
are not accounted
+   * when using {@link #max(long)}.
+   */
+  public KVStoreView<T> skip(long n) {
+    this.skip = n;
+    return this;
+  }
+
+  /**
+   * Returns an iterator for the current configuration.
+   */
+  public KVStoreIterator<T> closeableIterator() throws Exception {
+    return (KVStoreIterator<T>) iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
new file mode 100644
index 0000000..90f2ff0
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to their 
indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class<?> type;
+  private final Map<String, KVIndex> indices;
+  private final Map<String, Accessor> accessors;
+
+  public KVTypeInfo(Class<?> type) throws Exception {
+    this.type = type;
+    this.accessors = new HashMap<>();
+    this.indices = new HashMap<>();
+
+    for (Field f : type.getDeclaredFields()) {
+      KVIndex idx = f.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        indices.put(idx.value(), idx);
+        f.setAccessible(true);
+        accessors.put(idx.value(), new FieldAccessor(f));
+      }
+    }
+
+    for (Method m : type.getDeclaredMethods()) {
+      KVIndex idx = m.getAnnotation(KVIndex.class);
+      if (idx != null) {
+        checkIndex(idx, indices);
+        Preconditions.checkArgument(m.getParameterTypes().length == 0,
+          "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+        indices.put(idx.value(), idx);
+        m.setAccessible(true);
+        accessors.put(idx.value(), new MethodAccessor(m));
+      }
+    }
+
+    
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+        "No natural index defined for type %s.", type.getName());
+    
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+        "Natural index of %s cannot have a parent.", type.getName());
+
+    for (KVIndex idx : indices.values()) {
+      if (!idx.parent().isEmpty()) {
+        KVIndex parent = indices.get(idx.parent());
+        Preconditions.checkArgument(parent != null,
+          "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+        Preconditions.checkArgument(parent.parent().isEmpty(),
+          "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
+      }
+    }
+  }
+
+  private void checkIndex(KVIndex idx, Map<String, KVIndex> indices) {
+    Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(),
+      "No name provided for index in type %s.", type.getName());
+    Preconditions.checkArgument(
+      !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+      "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
+    Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
+      "Index %s cannot be parent of itself.", idx.value());
+    Preconditions.checkArgument(!indices.containsKey(idx.value()),
+      "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class<?> getType() {
+    return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
+    return getAccessor(indexName).get(instance);
+  }
+
+  public Stream<KVIndex> indices() {
+    return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+    Accessor a = accessors.get(indexName);
+    Preconditions.checkArgument(a != null, "No index %s.", indexName);
+    return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+    KVIndex index = indices.get(indexName);
+    return index.parent().isEmpty() ? null : getAccessor(index.parent());
+  }
+
+  /**
+   * Abstracts the difference between invoking a Field and a Method.
+   */
+  interface Accessor {
+
+    Object get(Object instance) throws Exception;
+
+  }
+
+  private class FieldAccessor implements Accessor {
+
+    private final Field field;
+
+    FieldAccessor(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return field.get(instance);
+    }
+
+  }
+
+  private class MethodAccessor implements Accessor {
+
+    private final Method method;
+
+    MethodAccessor(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public Object get(Object instance) throws Exception {
+      return method.invoke(instance);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
new file mode 100644
index 0000000..08b22fd
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  final AtomicReference<DB> _db;
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the 
store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, 
since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
+
+  public LevelDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+
+    Options options = new Options();
+    options.createIfMissing(!path.exists());
+    this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    LevelDBTypeInfo ti = getTypeInfo(value.getClass());
+
+    try (WriteBatch batch = db().createWriteBatch()) {
+      byte[] data = serializer.serialize(value);
+      synchronized (ti) {
+        Object existing;
+        try {
+          existing = get(ti.naturalIndex().entityKey(null, value), 
value.getClass());
+        } catch (NoSuchElementException e) {
+          existing = null;
+        }
+
+        PrefixCache cache = new PrefixCache(value);
+        byte[] naturalKey = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(value));
+        for (LevelDBTypeInfo.Index idx : ti.indices()) {
+          byte[] prefix = cache.getPrefix(idx);
+          idx.add(batch, value, existing, data, naturalKey, prefix);
+        }
+        db().write(batch);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not 
allowed.");
+    try (WriteBatch batch = db().createWriteBatch()) {
+      LevelDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = 
ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (LevelDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(batch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(batch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>(type) {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          return new LevelDBIterator<>(LevelDB.this, this);
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws 
Exception {
+    LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    DB _db = this._db.getAndSet(null);
+    if (_db == null) {
+      return;
+    }
+
+    try {
+      _db.close();
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  /** Returns metadata about indices for the given type. */
+  LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    LevelDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, 
getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the 
JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, 
but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  DB db() {
+    DB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {
+
+    private final Object entity;
+    private final Map<LevelDBTypeInfo.Index, byte[]> prefixes;
+
+    PrefixCache(Object entity) {
+      this.entity = entity;
+      this.prefixes = new HashMap<>();
+    }
+
+    byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception {
+      byte[] prefix = null;
+      if (idx.isChild()) {
+        prefix = prefixes.get(idx.parent());
+        if (prefix == null) {
+          prefix = idx.parent().childPrefix(idx.parent().getValue(entity));
+          prefixes.put(idx.parent(), prefix);
+        }
+      }
+      return prefix;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
new file mode 100644
index 0000000..a5d0f9f
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.DBIterator;
+
+class LevelDBIterator<T> implements KVStoreIterator<T> {
+
+  private final LevelDB db;
+  private final boolean ascending;
+  private final DBIterator it;
+  private final Class<T> type;
+  private final LevelDBTypeInfo ti;
+  private final LevelDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().iterator();
+    this.type = params.type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", 
params.index);
+    byte[] parent = index.isChild() ? 
index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.hasNext()) {
+        // When descending, the caller may have set up the start of iteration 
at a non-existant
+        // entry that is guaranteed to be after the desired entry. For 
example, if you have a
+        // compound key (a, b) where b is a, integer, you may seek to the end 
of the elements that
+        // have the same "a" value by specifying Integer.MAX_VALUE for "b", 
and that value may not
+        // exist in the database. So need to check here whether the next value 
actually belongs to
+        // the set being returned by the iterator before advancing.
+        byte[] nextKey = it.peekNext().getKey();
+        if (compare(nextKey, indexKeyPrefix) <= 0) {
+          it.next();
+        }
+      }
+    }
+    this.end = end;
+
+    if (params.skip > 0) {
+      skip(params.skip);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+
+    try {
+      T ret;
+      if (index == null || index.isCopy()) {
+        ret = db.serializer.deserialize(next, type);
+      } else {
+        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), 
next);
+        ret = db.get(key, type);
+      }
+      next = null;
+      return ret;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> next(int max) {
+    List<T> list = new ArrayList<>(max);
+    while (hasNext() && list.size() < max) {
+      list.add(next());
+    }
+    return list;
+  }
+
+  @Override
+  public boolean skip(long n) {
+    long skipped = 0;
+    while (skipped < n) {
+      if (next != null) {
+        checkedNext = false;
+        next = null;
+        skipped++;
+        continue;
+      }
+
+      boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+      if (!hasNext) {
+        checkedNext = true;
+        return false;
+      }
+
+      Map.Entry<byte[], byte[]> e = ascending ? it.next() : it.prev();
+      if (!isEndMarker(e.getKey())) {
+        skipped++;
+      }
+    }
+
+    return hasNext();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+    }
+  }
+
+  private byte[] loadNext() {
+    if (count >= max) {
+      return null;
+    }
+
+    try {
+      while (true) {
+        boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
+        if (!hasNext) {
+          return null;
+        }
+
+        Map.Entry<byte[], byte[]> nextEntry;
+        try {
+          // Avoid races if another thread is updating the DB.
+          nextEntry = ascending ? it.next() : it.prev();
+        } catch (NoSuchElementException e) {
+          return null;
+        }
+
+        byte[] nextKey = nextEntry.getKey();
+        // Next key is not part of the index, stop.
+        if (!startsWith(nextKey, indexKeyPrefix)) {
+          return null;
+        }
+
+        // If the next key is an end marker, then skip it.
+        if (isEndMarker(nextKey)) {
+          continue;
+        }
+
+        // If there's a known end key and iteration has gone past it, stop.
+        if (end != null) {
+          int comp = compare(nextKey, end) * (ascending ? 1 : -1);
+          if (comp > 0) {
+            return null;
+          }
+        }
+
+        count++;
+
+        // Next element is part of the iteration, return it.
+        return nextEntry.getValue();
+      }
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  static boolean startsWith(byte[] key, byte[] prefix) {
+    if (key.length < prefix.length) {
+      return false;
+    }
+
+    for (int i = 0; i < prefix.length; i++) {
+      if (key[i] != prefix[i]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean isEndMarker(byte[] key) {
+    return (key.length > 2 &&
+        key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
+        key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
+  }
+
+  static int compare(byte[] a, byte[] b) {
+    int diff = 0;
+    int minLen = Math.min(a.length, b.length);
+    for (int i = 0; i < minLen; i++) {
+      diff += (a[i] - b[i]);
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return a.length - b.length;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
new file mode 100644
index 0000000..3ab17db
--- /dev/null
+++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * <p>
+ * The hierarchy of keys stored in LevelDB looks roughly like the following. 
This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the store 
is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * </p>
+ *
+ * <p>
+ * Indentation defines when a sub-key lives under a parent key. In LevelDB, 
this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * </p>
+ *
+ * <pre>
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ *     +NATURAL_KEY
+ *     -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ *     +INDEX_VALUE
+ *       +NATURAL_KEY
+ *     -INDEX_VALUE
+ *     .INDEX_VALUE
+ *       CHILD_INDEX_NAME
+ *         +CHILD_INDEX_VALUE
+ *           NATURAL_KEY_OR_DATA
+ *         -
+ *   -INDEX_NAME
+ * </pre>
+ *
+ * <p>
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+<something>". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-<something>"). A count is also kept at 
the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * </p>
+ *
+ * <p>
+ * To illustrate, given a type "Foo", with a natural index and a second index 
called "bar", you'd
+ * have these keys and values in the store for two instances, one with natural 
key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * </p>
+ *
+ * <pre>
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -       [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -       [count of all Foo with "bar=yes" ]
+ * </pre>
+ *
+ * <p>
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an index 
by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code and 
comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" of 
the entity.
+ * </p>
+ *
+ * <p>
+ * Child indices are stored after their parent index. In the example above, 
let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have value 
"no" for this field,
+ * the data in the store would look something like the following:
+ * </p>
+ *
+ * <pre>
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index 
type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index 
type]
+ * ...
+ * </pre>
+ */
+class LevelDBTypeInfo {
+
+  static final byte[] END_MARKER = new byte[] { '-' };
+  static final byte ENTRY_PREFIX = (byte) '+';
+  static final byte KEY_SEPARATOR = 0x0;
+  static byte TRUE = (byte) '1';
+  static byte FALSE = (byte) '0';
+
+  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
+  private static final byte POSITIVE_MARKER = (byte) '=';
+  private static final byte NEGATIVE_MARKER = (byte) '*';
+  private static final byte[] HEX_BYTES = new byte[] {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 
'f'
+  };
+
+  private final LevelDB db;
+  private final Class<?> type;
+  private final Map<String, Index> indices;
+  private final byte[] typePrefix;
+
+  LevelDBTypeInfo(LevelDB db, Class<?> type, byte[] alias) throws Exception {
+    this.db = db;
+    this.type = type;
+    this.indices = new HashMap<>();
+
+    KVTypeInfo ti = new KVTypeInfo(type);
+
+    // First create the parent indices, then the child indices.
+    ti.indices().forEach(idx -> {
+      if (idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), 
null));
+      }
+    });
+    ti.indices().forEach(idx -> {
+      if (!idx.parent().isEmpty()) {
+        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
+          indices.get(idx.parent())));
+      }
+    });
+
+    this.typePrefix = alias;
+  }
+
+  Class<?> type() {
+    return type;
+  }
+
+  byte[] keyPrefix() {
+    return typePrefix;
+  }
+
+  Index naturalIndex() {
+    return index(KVIndex.NATURAL_INDEX_NAME);
+  }
+
+  Index index(String name) {
+    Index i = indices.get(name);
+    Preconditions.checkArgument(i != null, "Index %s does not exist for type 
%s.", name,
+      type.getName());
+    return i;
+  }
+
+  Collection<Index> indices() {
+    return indices.values();
+  }
+
+  byte[] buildKey(byte[]... components) {
+    return buildKey(true, components);
+  }
+
+  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
+    int len = 0;
+    if (addTypePrefix) {
+      len += typePrefix.length + 1;
+    }
+    for (byte[] comp : components) {
+      len += comp.length;
+    }
+    len += components.length - 1;
+
+    byte[] dest = new byte[len];
+    int written = 0;
+
+    if (addTypePrefix) {
+      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
+      dest[typePrefix.length] = KEY_SEPARATOR;
+      written += typePrefix.length + 1;
+    }
+
+    for (byte[] comp : components) {
+      System.arraycopy(comp, 0, dest, written, comp.length);
+      written += comp.length;
+      if (written < dest.length) {
+        dest[written] = KEY_SEPARATOR;
+        written++;
+      }
+    }
+
+    return dest;
+  }
+
+  /**
+   * Models a single index in LevelDB. See top-level class's javadoc for a 
description of how the
+   * keys are generated.
+   */
+  class Index {
+
+    private final boolean copy;
+    private final boolean isNatural;
+    private final byte[] name;
+    private final KVTypeInfo.Accessor accessor;
+    private final Index parent;
+
+    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
+      byte[] name = self.value().getBytes(UTF_8);
+      if (parent != null) {
+        byte[] child = new byte[name.length + 1];
+        child[0] = SECONDARY_IDX_PREFIX;
+        System.arraycopy(name, 0, child, 1, name.length);
+      }
+
+      this.name = name;
+      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
+      this.copy = isNatural || self.copy();
+      this.accessor = accessor;
+      this.parent = parent;
+    }
+
+    boolean isCopy() {
+      return copy;
+    }
+
+    boolean isChild() {
+      return parent != null;
+    }
+
+    Index parent() {
+      return parent;
+    }
+
+    /**
+     * Creates a key prefix for child indices of this index. This allows the 
prefix to be
+     * calculated only once, avoiding redundant work when multiple child 
indices of the
+     * same parent index exist.
+     */
+    byte[] childPrefix(Object value) throws Exception {
+      Preconditions.checkState(parent == null, "Not a parent index.");
+      return buildKey(name, toParentKey(value));
+    }
+
+    /**
+     * Gets the index value for a particular entity (which is the value of the 
field or method
+     * tagged with the index annotation). This is used as part of the LevelDB 
key where the
+     * entity (or its id) is stored.
+     */
+    Object getValue(Object entity) throws Exception {
+      return accessor.get(entity);
+    }
+
+    private void checkParent(byte[] prefix) {
+      if (prefix != null) {
+        Preconditions.checkState(parent != null, "Parent prefix provided for 
parent index.");
+      } else {
+        Preconditions.checkState(parent == null, "Parent prefix missing for 
child index.");
+      }
+    }
+
+    /** The prefix for all keys that belong to this index. */
+    byte[] keyPrefix(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
+    }
+
+    /**
+     * The key where to start ascending iteration for entities whose value for 
the indexed field
+     * match the given value.
+     */
+    byte[] start(byte[] prefix, Object value) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
+        : buildKey(name, toKey(value));
+    }
+
+    /** The key for the index's end marker. */
+    byte[] end(byte[] prefix) {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
+        : buildKey(name, END_MARKER);
+    }
+
+    /** The key for the end marker for entries with the given value. */
+    byte[] end(byte[] prefix, Object value) throws Exception {
+      checkParent(prefix);
+      return (parent != null) ? buildKey(false, prefix, name, toKey(value), 
END_MARKER)
+        : buildKey(name, toKey(value), END_MARKER);
+    }
+
+    /** The full key in the index that identifies the given entity. */
+    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
+        name, type.getName());
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, 
toKey(naturalIndex().getValue(entity)));
+      }
+      return entityKey;
+    }
+
+    private void updateCount(WriteBatch batch, byte[] key, long delta) throws 
Exception {
+      long updated = getCount(key) + delta;
+      if (updated > 0) {
+        batch.put(key, db.serializer.serialize(updated));
+      } else {
+        batch.delete(key);
+      }
+    }
+
+    private void addOrRemove(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      Object indexValue = getValue(entity);
+      Preconditions.checkNotNull(indexValue, "Null index value for %s in type 
%s.",
+        name, type.getName());
+
+      byte[] entityKey = start(prefix, indexValue);
+      if (!isNatural) {
+        entityKey = buildKey(false, entityKey, naturalKey);
+      }
+
+      boolean needCountUpdate = (existing == null);
+
+      // Check whether there's a need to update the index. The index needs to 
be updated in two
+      // cases:
+      //
+      // - There is no existing value for the entity, so a new index value 
will be added.
+      // - If there is a previously stored value for the entity, and the index 
value for the
+      //   current index does not match the new value, the old entry needs to 
be deleted and
+      //   the new one added.
+      //
+      // Natural indices don't need to be checked, because by definition both 
old and new entities
+      // will have the same key. The put() call is all that's needed in that 
case.
+      //
+      // Also check whether we need to update the counts. If the indexed value 
is changing, we
+      // need to decrement the count at the old index value, and the new 
indexed value count needs
+      // to be incremented.
+      if (existing != null && !isNatural) {
+        byte[] oldPrefix = null;
+        Object oldIndexedValue = getValue(existing);
+        boolean removeExisting = !indexValue.equals(oldIndexedValue);
+        if (!removeExisting && isChild()) {
+          oldPrefix = parent().childPrefix(parent().getValue(existing));
+          removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0;
+        }
+
+        if (removeExisting) {
+          if (oldPrefix == null && isChild()) {
+            oldPrefix = parent().childPrefix(parent().getValue(existing));
+          }
+
+          byte[] oldKey = entityKey(oldPrefix, existing);
+          batch.delete(oldKey);
+
+          // If the indexed value has changed, we need to update the counts at 
the old and new
+          // end markers for the indexed value.
+          if (!isChild()) {
+            byte[] oldCountKey = end(null, oldIndexedValue);
+            updateCount(batch, oldCountKey, -1L);
+            needCountUpdate = true;
+          }
+        }
+      }
+
+      if (data != null) {
+        byte[] stored = copy ? data : naturalKey;
+        batch.put(entityKey, stored);
+      } else {
+        batch.delete(entityKey);
+      }
+
+      if (needCountUpdate && !isChild()) {
+        long delta = data != null ? 1L : -1L;
+        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
+        updateCount(batch, countKey, delta);
+      }
+    }
+
+    /**
+     * Add an entry to the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being added to the index.
+     * @param existing The entity being replaced in the index, or null.
+     * @param data Serialized entity to store (when storing the entity, not a 
reference).
+     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void add(
+        WriteBatch batch,
+        Object entity,
+        Object existing,
+        byte[] data,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
+    }
+
+    /**
+     * Remove a value from the index.
+     *
+     * @param batch Write batch with other related changes.
+     * @param entity The entity being removed, to identify the index entry to 
modify.
+     * @param naturalKey The value's natural key (to avoid re-computing it for 
every index).
+     * @param prefix The parent index prefix, if this is a child index.
+     */
+    void remove(
+        WriteBatch batch,
+        Object entity,
+        byte[] naturalKey,
+        byte[] prefix) throws Exception {
+      addOrRemove(batch, entity, null, null, naturalKey, prefix);
+    }
+
+    long getCount(byte[] key) throws Exception {
+      byte[] data = db.db().get(key);
+      return data != null ? db.serializer.deserializeLong(data) : 0;
+    }
+
+    byte[] toParentKey(Object value) {
+      return toKey(value, SECONDARY_IDX_PREFIX);
+    }
+
+    byte[] toKey(Object value) {
+      return toKey(value, ENTRY_PREFIX);
+    }
+
+    /**
+     * Translates a value to be used as part of the store key.
+     *
+     * Integral numbers are encoded as a string in a way that preserves 
lexicographical
+     * ordering. The string is prepended with a marker telling whether the 
number is negative
+     * or positive ("*" for negative and "=" for positive are used since "-" 
and "+" have the
+     * opposite of the desired order), and then the number is encoded into a 
hex string (so
+     * it occupies twice the number of bytes as the original type).
+     *
+     * Arrays are encoded by encoding each element separately, separated by 
KEY_SEPARATOR.
+     */
+    byte[] toKey(Object value, byte prefix) {
+      final byte[] result;
+
+      if (value instanceof String) {
+        byte[] str = ((String) value).getBytes(UTF_8);
+        result = new byte[str.length + 1];
+        result[0] = prefix;
+        System.arraycopy(str, 0, result, 1, str.length);
+      } else if (value instanceof Boolean) {
+        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
+      } else if (value.getClass().isArray()) {
+        int length = Array.getLength(value);
+        byte[][] components = new byte[length][];
+        for (int i = 0; i < length; i++) {
+          components[i] = toKey(Array.get(value, i));
+        }
+        result = buildKey(false, components);
+      } else {
+        int bytes;
+
+        if (value instanceof Integer) {
+          bytes = Integer.SIZE;
+        } else if (value instanceof Long) {
+          bytes = Long.SIZE;
+        } else if (value instanceof Short) {
+          bytes = Short.SIZE;
+        } else if (value instanceof Byte) {
+          bytes = Byte.SIZE;
+        } else {
+          throw new IllegalArgumentException(String.format("Type %s not 
allowed as key.",
+            value.getClass().getName()));
+        }
+
+        bytes = bytes / Byte.SIZE;
+
+        byte[] key = new byte[bytes * 2 + 2];
+        long longValue = ((Number) value).longValue();
+        key[0] = prefix;
+        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+
+        for (int i = 0; i < key.length - 2; i++) {
+          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
+          key[key.length - i - 1] = HEX_BYTES[masked];
+        }
+
+        result = key;
+      }
+
+      return result;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
new file mode 100644
index 0000000..2ed246e
--- /dev/null
+++ 
b/common/kvstore/src/main/java/org/apache/spark/kvstore/UnsupportedStoreVersionException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when the store implementation is not compatible with the 
underlying data.
+ */
+public class UnsupportedStoreVersionException extends IOException {
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
new file mode 100644
index 0000000..afb72b8
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/CustomType1.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import com.google.common.base.Objects;
+
+public class CustomType1 {
+
+  @KVIndex
+  public String key;
+
+  @KVIndex("id")
+  public String id;
+
+  @KVIndex(value = "name", copy = true)
+  public String name;
+
+  @KVIndex("int")
+  public int num;
+
+  @KVIndex(value = "child", parent = "id")
+  public String child;
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof CustomType1) {
+      CustomType1 other = (CustomType1) o;
+      return id.equals(other.id) && name.equals(other.name);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("key", key)
+      .add("id", id)
+      .add("name", name)
+      .add("num", num)
+      .toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba4951/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java 
b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
new file mode 100644
index 0000000..8549712
--- /dev/null
+++ b/common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DBIteratorSuite.class);
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List<CustomType1> allEntries;
+  private static List<CustomType1> clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator<CustomType1> {
+    /**
+     * Returns a comparator that falls back to natural order if this 
comparator's ordering
+     * returns equality for two elements. Used to mimic how the index sorts 
things internally.
+     */
+    default BaseComparator fallback() {
+      return (t1, t2) -> {
+        int result = BaseComparator.this.compare(t1, t2);
+        if (result != 0) {
+          return result;
+        }
+
+        return t1.key.compareTo(t2.key);
+      };
+    }
+
+    /** Reverses the order of this comparator. */
+    default BaseComparator reverse() {
+      return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+    }
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num 
- t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned up in 
a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @BeforeClass
+  public static void setupClass() {
+    long seed = RND.nextLong();
+    LOG.info("Random seed: {}", seed);
+    RND.setSeed(seed);
+  }
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+    allEntries = null;
+    db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    if (db != null) {
+      return;
+    }
+
+    db = createStore();
+
+    int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+    allEntries = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "key" + i;
+      t.id = "id" + i;
+      t.name = "name" + RND.nextInt(MAX_ENTRIES);
+      t.num = RND.nextInt(MAX_ENTRIES);
+      t.child = "child" + (i % MIN_ENTRIES);
+      allEntries.add(t);
+    }
+
+    // Shuffle the entries to avoid the insertion order matching the natural 
ordering. Just in case.
+    Collections.shuffle(allEntries, RND);
+    for (CustomType1 e : allEntries) {
+      db.write(e);
+    }
+
+    // Pick the first generated value, and forcefully create a few entries 
that will clash
+    // with the indexed values (id and name), to make sure the index behaves 
correctly when
+    // multiple entities are indexed by the same value.
+    //
+    // This also serves as a test for the test code itself, to make sure it's 
sorting indices
+    // the same way the store is expected to.
+    CustomType1 first = allEntries.get(0);
+    clashingEntries = new ArrayList<>();
+
+    int clashCount = RND.nextInt(MIN_ENTRIES) + 1;
+    for (int i = 0; i < clashCount; i++) {
+      CustomType1 t = new CustomType1();
+      t.key = "n-key" + (count + i);
+      t.id = first.id;
+      t.name = first.name;
+      t.num = first.num;
+      t.child = first.child;
+      allEntries.add(t);
+      clashingEntries.add(t);
+      db.write(t);
+    }
+
+    // Create another entry that could cause problems: take the first entry, 
and make its indexed
+    // name be an extension of the existing ones, to make sure the 
implementation sorts these
+    // correctly even considering the separator character (shorter strings 
first).
+    CustomType1 t = new CustomType1();
+    t.key = "extended-key-0";
+    t.id = first.id;
+    t.name = first.name + "a";
+    t.num = first.num;
+    t.child = first.child;
+    allEntries.add(t);
+    db.write(t);
+  }
+
+  @Test
+  public void naturalIndex() throws Exception {
+    testIteration(NATURAL_ORDER, view(), null, null);
+  }
+
+  @Test
+  public void refIndex() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
+  }
+
+  @Test
+  public void copyIndex() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
+  }
+
+  @Test
+  public void numericIndex() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
+  }
+
+  @Test
+  public void childIndex() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, view().index("child").parent(any.id), 
null, null);
+  }
+
+  @Test
+  public void naturalIndexDescending() throws Exception {
+    testIteration(NATURAL_ORDER, view().reverse(), null, null);
+  }
+
+  @Test
+  public void refIndexDescending() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
+  }
+
+  @Test
+  public void copyIndexDescending() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, 
null);
+  }
+
+  @Test
+  public void numericIndexDescending() throws Exception {
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, 
null);
+  }
+
+  @Test
+  public void childIndexDescending() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).reverse(), null, null);
+  }
+
+  @Test
+  public void naturalIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().first(first.key), first, null);
+  }
+
+  @Test
+  public void refIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, 
null);
+  }
+
+  @Test
+  public void copyIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), 
first, null);
+  }
+
+  @Test
+  public void numericIndexWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), 
first, null);
+  }
+
+  @Test
+  public void childIndexWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).first(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, 
null);
+  }
+
+  @Test
+  public void refIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(REF_INDEX_ORDER, 
view().reverse().index("id").first(first.id), first, null);
+  }
+
+  @Test
+  public void copyIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").first(first.name), first, null);
+  }
+
+  @Test
+  public void numericIndexDescendingWithStart() throws Exception {
+    CustomType1 first = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").first(first.num), first, null);
+  }
+
+  @Test
+  public void childIndexDescendingWithStart() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER,
+      view().index("child").parent(any.id).first(any.child).reverse(), null, 
null);
+  }
+
+  @Test
+  public void naturalIndexWithSkip() throws Exception {
+    testIteration(NATURAL_ORDER, view().skip(pickCount()), null, null);
+  }
+
+  @Test
+  public void refIndexWithSkip() throws Exception {
+    testIteration(REF_INDEX_ORDER, view().index("id").skip(pickCount()), null, 
null);
+  }
+
+  @Test
+  public void copyIndexWithSkip() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").skip(pickCount()), 
null, null);
+  }
+
+  @Test
+  public void childIndexWithSkip() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).skip(pickCount()),
+      null, null);
+  }
+
+  @Test
+  public void naturalIndexWithMax() throws Exception {
+    testIteration(NATURAL_ORDER, view().max(pickCount()), null, null);
+  }
+
+  @Test
+  public void copyIndexWithMax() throws Exception {
+    testIteration(COPY_INDEX_ORDER, view().index("name").max(pickCount()), 
null, null);
+  }
+
+  @Test
+  public void childIndexWithMax() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).max(pickCount()), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, 
last);
+  }
+
+  @Test
+  public void copyIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), 
null, last);
+  }
+
+  @Test
+  public void numericIndexWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), 
null, last);
+  }
+
+  @Test
+  public void childIndexWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child), null,
+      null);
+  }
+
+  @Test
+  public void naturalIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
+  }
+
+  @Test
+  public void refIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), 
null, last);
+  }
+
+  @Test
+  public void copyIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(COPY_INDEX_ORDER, 
view().reverse().index("name").last(last.name),
+      null, last);
+  }
+
+  @Test
+  public void numericIndexDescendingWithLast() throws Exception {
+    CustomType1 last = pickLimit();
+    testIteration(NUMERIC_INDEX_ORDER, 
view().reverse().index("int").last(last.num),
+      null, last);
+   }
+
+  @Test
+  public void childIndexDescendingWithLast() throws Exception {
+    CustomType1 any = pickLimit();
+    testIteration(CHILD_INDEX_ORDER, 
view().index("child").parent(any.id).last(any.child).reverse(),
+      null, null);
+  }
+
+  @Test
+  public void testRefWithIntNaturalKey() throws Exception {
+    LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
+    i.key = 1;
+    i.id = "1";
+    i.values = Arrays.asList("1");
+
+    db.write(i);
+
+    try(KVStoreIterator<?> it = db.view(i.getClass()).closeableIterator()) {
+      Object read = it.next();
+      assertEquals(i, read);
+    }
+  }
+
+  private CustomType1 pickLimit() {
+    // Picks an element that has clashes with other elements in the given 
index.
+    return clashingEntries.get(RND.nextInt(clashingEntries.size()));
+  }
+
+  private int pickCount() {
+    int count = RND.nextInt(allEntries.size() / 2);
+    return Math.max(count, 1);
+  }
+
+  /**
+   * Compares the two values and falls back to comparing the natural key of 
CustomType1
+   * if they're the same, to mimic the behavior of the indexing code.
+   */
+  private <T extends Comparable<T>> int compareWithFallback(
+      T v1,
+      T v2,
+      CustomType1 ct1,
+      CustomType1 ct2) {
+    int result = v1.compareTo(v2);
+    if (result != 0) {
+      return result;
+    }
+
+    return ct1.key.compareTo(ct2.key);
+  }
+
+  private void testIteration(
+      final BaseComparator order,
+      final KVStoreView<CustomType1> params,
+      final CustomType1 first,
+      final CustomType1 last) throws Exception {
+    List<CustomType1> indexOrder = sortBy(order.fallback());
+    if (!params.ascending) {
+      indexOrder = Lists.reverse(indexOrder);
+    }
+
+    Iterable<CustomType1> expected = indexOrder;
+    BaseComparator expectedOrder = params.ascending ? order : order.reverse();
+
+    if (params.parent != null) {
+      expected = Iterables.filter(expected, v -> params.parent.equals(v.id));
+    }
+
+    if (first != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(first, 
v) <= 0);
+    }
+
+    if (last != null) {
+      expected = Iterables.filter(expected, v -> expectedOrder.compare(v, 
last) <= 0);
+    }
+
+    if (params.skip > 0) {
+      expected = Iterables.skip(expected, (int) params.skip);
+    }
+
+    if (params.max != Long.MAX_VALUE) {
+      expected = Iterables.limit(expected, (int) params.max);
+    }
+
+    List<CustomType1> actual = collect(params);
+    compareLists(expected, actual);
+  }
+
+  /** Could use assertEquals(), but that creates hard to read errors for large 
lists. */
+  private void compareLists(Iterable<?> expected, List<?> actual) {
+    Iterator<?> expectedIt = expected.iterator();
+    Iterator<?> actualIt = actual.iterator();
+
+    int count = 0;
+    while (expectedIt.hasNext()) {
+      if (!actualIt.hasNext()) {
+        break;
+      }
+      count++;
+      assertEquals(expectedIt.next(), actualIt.next());
+    }
+
+    String message;
+    Object[] remaining;
+    int expectedCount = count;
+    int actualCount = count;
+
+    if (expectedIt.hasNext()) {
+      remaining = Iterators.toArray(expectedIt, Object.class);
+      expectedCount += remaining.length;
+      message = "missing";
+    } else {
+      remaining = Iterators.toArray(actualIt, Object.class);
+      actualCount += remaining.length;
+      message = "stray";
+    }
+
+    assertEquals(String.format("Found %s elements: %s", message, 
Arrays.asList(remaining)),
+      expectedCount, actualCount);
+  }
+
+  private KVStoreView<CustomType1> view() throws Exception {
+    return db.view(CustomType1.class);
+  }
+
+  private List<CustomType1> collect(KVStoreView<CustomType1> view) throws 
Exception {
+    return Arrays.asList(Iterables.toArray(view, CustomType1.class));
+  }
+
+  private List<CustomType1> sortBy(Comparator<CustomType1> comp) {
+    List<CustomType1> copy = new ArrayList<>(allEntries);
+    Collections.sort(copy, comp);
+    return copy;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to