Repository: samza
Updated Branches:
  refs/heads/master f2fd9aaab -> 3a3c278a9


SAMZA-435; remove leveldb


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a3c278a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a3c278a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a3c278a

Branch: refs/heads/master
Commit: 3a3c278a9c16b5cbbedfd11238d3aeb2ef6e61e5
Parents: f2fd9aa
Author: Ruslan Khafizov <[email protected]>
Authored: Wed Feb 4 09:32:53 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Wed Feb 4 09:32:53 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  14 --
 .../versioned/jobs/configuration-table.html     | 105 +--------
 docs/startup/download/index.md                  |   6 -
 gradle/dependency-versions.gradle               |   1 -
 .../kv/KeyValueStorageEngineFactory.scala       |  42 ----
 .../LevelDbKeyValueStorageEngineFactory.scala   |  61 ------
 .../samza/storage/kv/LevelDbKeyValueStore.scala | 218 -------------------
 .../apache/samza/storage/kv/CachedStore.scala   |   8 +-
 .../src/main/config/perf/kv-perf.properties     |   2 +-
 .../samza/storage/kv/TestKeyValueStores.scala   |  15 +-
 .../test/integration/TestStatefulTask.scala     |   2 +-
 settings.gradle                                 |   1 -
 12 files changed, 11 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4f9fb44..4d0b44f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -328,19 +328,6 @@ project(":samza-kv-inmemory_$scalaVersion") {
   }
 }
 
-project(":samza-kv-leveldb_$scalaVersion") {
-  apply plugin: 'scala'
-
-  dependencies {
-    compile project(':samza-api')
-    compile project(":samza-core_$scalaVersion")
-    compile project(":samza-kv_$scalaVersion")
-    compile "org.scala-lang:scala-library:$scalaLibVersion"
-    compile "org.fusesource.leveldbjni:leveldbjni-all:$leveldbVersion"
-    testCompile "junit:junit:$junitVersion"
-  }
-}
-
 project(":samza-kv-rocksdb_$scalaVersion") {
   apply plugin: 'scala'
 
@@ -368,7 +355,6 @@ project(":samza-test_$scalaVersion") {
   dependencies {
     compile project(':samza-api')
     compile project(":samza-kv-inmemory_$scalaVersion")
-    compile project(":samza-kv-leveldb_$scalaVersion")
     compile project(":samza-kv-rocksdb_$scalaVersion")
     compile project(":samza-core_$scalaVersion")
     runtime project(":samza-log4j")

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 008fc78..ec12874 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -832,13 +832,8 @@
                         <a 
href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config,
 org.apache.samza.task.TaskContext)">init()</a>
                         method). The value of this property is the 
fully-qualified name of a Java class that implements
                         <a 
href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>.
-                        Samza currently ships with two storage engine 
implementations:
+                        Samza currently ships with one storage engine 
implementation:
                         <dl>
-                            
<dt><code>org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code></dt>
-                            <dd>An on-disk storage engine with a key-value 
interface, implemented using
-                                <a 
href="https://code.google.com/p/leveldb/";>LevelDB</a>. It supports fast 
random-access
-                                reads and writes, as well as range queries on 
keys. LevelDB can be configured with
-                                various <a href="#keyvalue-leveldb">additional 
tuning parameters</a>.</dd>
                             
<dt><code>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code></dt>
                             <dd>An on-disk storage engine with a key-value 
interface, implemented using
                                 <a href="http://rocksdb.org/";>RocksDB</a>. It 
supports fast random-access
@@ -1004,7 +999,7 @@
                           <dt><code>fifo</code></dt>
                           <dd>Use <a 
href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style";>FIFO</a> 
compaction.</dd>
                           <dt><code>level</code></dt>
-                          <dd>Use LevelDB's standard leveled compaction.</dd>
+                          <dd>Use RocksDB's standard leveled compaction.</dd>
                       </dl>
                     </td>
                 </tr>
@@ -1018,102 +1013,6 @@
                 </tr>
 
                 <tr>
-                    <th colspan="3" class="section" id="keyvalue-leveldb">
-                        Using LevelDB for key-value storage<br>
-                        <span class="subtitle">
-                            (This section applies if you have set
-                            <a href="#stores-factory" 
class="property">stores.*.factory</a>
-                            <code>= 
org.apache.samza.storage.kv.LevelDbKeyValueStorageEngineFactory</code>)
-                        </span>
-                    </th>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-write-batch-size">stores.<span 
class="store">store-name</span>.<br>write.batch.size</td>
-                    <td class="default">500</td>
-                    <td class="description">
-                        For better write performance, the storage engine 
buffers writes and applies them
-                        to the underlying store in a batch. If the same key is 
written multiple times
-                        in quick succession, this buffer also deduplicates 
writes to the same key. This
-                        property is set to the number of key/value pairs that 
should be kept in this
-                        in-memory buffer, per task instance. The number cannot 
be greater than
-                        <a href="#stores-leveldb-object-cache-size" 
class="property">stores.*.object.cache.size</a>.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-object-cache-size">stores.<span 
class="store">store-name</span>.<br>object.cache.size</td>
-                    <td class="default">1000</td>
-                    <td class="description">
-                        Samza maintains an additional cache in front of 
LevelDB for frequently-accessed
-                        objects. This cache contains deserialized objects 
(avoiding the deserialization
-                        overhead on cache hits), in contrast to the LevelDB 
block cache
-                        (<a href="#stores-leveldb-container-cache-size-bytes" 
class="property">stores.*.container.cache.size.bytes</a>),
-                        which caches serialized objects. This property 
determines the number of objects
-                        to keep in Samza's cache, per task instance. This same 
cache is also used for
-                        write buffering (see <a 
href="#stores-leveldb-write-batch-size" 
class="property">stores.*.write.batch.size</a>).
-                        A value of 0 disables all caching and batching.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-container-cache-size-bytes">stores.<span 
class="store">store-name</span>.container.<br>cache.size.bytes</td>
-                    <td class="default">104857600</td>
-                    <td class="description">
-                        The size of LevelDB's block cache in bytes, per 
container. If there are several
-                        task instances within one container, each is given a 
proportional share of this cache.
-                        Note that this is an off-heap memory allocation, so 
the container's total memory use
-                        is the maximum JVM heap size <em>plus</em> the size of 
this cache.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-container-write-buffer-size-bytes">stores.<span 
class="store">store-name</span>.container.<br>write.buffer.size.bytes</td>
-                    <td class="default">33554432</td>
-                    <td class="description">
-                        The amount of memory (in bytes) that LevelDB uses for 
buffering writes before they are
-                        written to disk, per container. If there are several 
task instances within one
-                        container, each is given a proportional share of this 
buffer. This setting also
-                        determines the size of LevelDB's segment files.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-compaction-delete-threshold">stores.<span 
class="store">store-name</span>.<br>compaction.delete.threshold</td>
-                    <td class="default">-1</td>
-                    <td class="description">
-                        Setting this property forces a LevelDB compaction to 
be performed after a certain
-                        number of keys have been deleted from the store. This 
is used to work around
-                        <a 
href="https://issues.apache.org/jira/browse/SAMZA-254";>performance issues</a>
-                        in certain workloads.
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-compression">stores.<span 
class="store">store-name</span>.<br>leveldb.compression</td>
-                    <td class="default">snappy</td>
-                    <td class="description">
-                        This property controls whether LevelDB should compress 
data on disk and in the
-                        block cache. The following values are valid:
-                        <dl>
-                            <dt><code>snappy</code></dt>
-                            <dd>Compress data using the <a 
href="https://code.google.com/p/snappy/";>Snappy</a> codec.</dd>
-                            <dt><code>none</code></dt>
-                            <dd>Do not compress data.</dd>
-                        </dl>
-                    </td>
-                </tr>
-
-                <tr>
-                    <td class="property" 
id="stores-leveldb-block-size-bytes">stores.<span 
class="store">store-name</span>.<br>leveldb.block.size.bytes</td>
-                    <td class="default">4096</td>
-                    <td class="description">
-                        If compression is enabled, LevelDB groups 
approximately this many uncompressed bytes
-                        into one compressed block. You probably don't need to 
change this property.
-                    </td>
-                </tr>
-
-                <tr>
                     <th colspan="3" class="section" id="yarn">
                         Running your job on a <a 
href="../jobs/yarn-jobs.html">YARN</a> cluster<br>
                         <span class="subtitle">

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/docs/startup/download/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/download/index.md b/docs/startup/download/index.md
index e358453..329969e 100644
--- a/docs/startup/download/index.md
+++ b/docs/startup/download/index.md
@@ -75,12 +75,6 @@ A Maven-based Samza project can pull in all required 
dependencies Samza dependen
 </dependency>
 <dependency>
   <groupId>org.apache.samza</groupId>
-  <artifactId>samza-kv-leveldb_2.10</artifactId>
-  <version>0.8.0</version>
-  <scope>runtime</scope>
-</dependency>
-<dependency>
-  <groupId>org.apache.samza</groupId>
   <artifactId>samza-kv-rocksdb_2.10</artifactId>
   <version>0.8.0</version>
   <scope>runtime</scope>

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index 7bbaa41..12b324d 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -28,7 +28,6 @@
   metricsVersion = "2.2.0"
   kafkaVersion = "0.8.2-beta"
   commonsHttpClientVersion = "3.1"
-  leveldbVersion = "1.8"
   rocksdbVersion = "3.5.1"
   yarnVersion = "2.4.0"
   slf4jVersion = "1.6.2"

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
 
b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
deleted file mode 100644
index 0ba4f8a..0000000
--- 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamPartition
-
-/**
- * A backwards compatible factory that points to LevelDb. This exists for all 
the old Samza jobs
- * that still refer to KeyValueStorageEngineFactory.
- */
-class KeyValueStorageEngineFactory[K, V] extends 
BaseKeyValueStorageEngineFactory[K, V] {
-
-  override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: 
SystemStreamPartition,
-                          containerContext: SamzaContainerContext): 
KeyValueStore[Array[Byte], Array[Byte]] = {
-    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, 
registry, changeLogSystemStreamPartition, containerContext)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
 
b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
deleted file mode 100644
index 9642823..0000000
--- 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStorageEngineFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import java.io.File
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.serializers._
-import org.apache.samza.SamzaException
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.StorageEngine
-
-object LevelDbKeyValueStorageEngineFactory {
-  def getKeyValueStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: 
SystemStreamPartition,
-                          containerContext: SamzaContainerContext): 
KeyValueStore[Array[Byte], Array[Byte]] = {
-    val storageConfig = containerContext.config.subset("stores." + storeName + 
".", true)
-    val deleteCompactionThreshold = 
storageConfig.getInt("compaction.delete.threshold", -1)
-
-    val levelDbMetrics = new KeyValueStoreMetrics(storeName, registry)
-    val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, 
containerContext)
-    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, 
deleteCompactionThreshold, levelDbMetrics)
-
-    levelDb
-  }
-
-}
-
-class LevelDbKeyValueStorageEngineFactory[K, V] extends 
BaseKeyValueStorageEngineFactory[K, V] {
-
-  override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: 
SystemStreamPartition,
-                          containerContext: SamzaContainerContext): 
KeyValueStore[Array[Byte], Array[Byte]] = {
-    LevelDbKeyValueStorageEngineFactory.getKeyValueStore(storeName, storeDir, 
registry, changeLogSystemStreamPartition, containerContext)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
 
b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
deleted file mode 100644
index f4a021a..0000000
--- 
a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv
-
-import org.iq80.leveldb._
-import org.fusesource.leveldbjni.JniDBFactory._
-import java.io._
-import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
-import org.apache.samza.util.{LexicographicComparator, Logging}
-
-object LevelDbKeyValueStore extends Logging {
-
-  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
-    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 
1024 * 1024L)
-    val writeBufSize = 
storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
-    val options = new Options
-
-    // Cache size and write buffer size are specified on a per-container basis.
-    options.cacheSize(cacheSize / containerContext.taskNames.size)
-    options.writeBufferSize((writeBufSize / 
containerContext.taskNames.size).toInt)
-    options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
-    options.compressionType(
-      storeConfig.get("leveldb.compression", "snappy") match {
-        case "snappy" => CompressionType.SNAPPY
-        case "none" => CompressionType.NONE
-        case _ =>
-          warn("Unknown leveldb.compression codec %s, defaulting to Snappy" 
format storeConfig.get("leveldb.compression", "snappy"))
-          CompressionType.SNAPPY
-      })
-    options.createIfMissing(true)
-    options.errorIfExists(true)
-    options
-  }
-}
-
-class LevelDbKeyValueStore(
-  val dir: File,
-  val options: Options,
-
-  /**
-   * How many deletes must occur before we will force a compaction. This is to
-   * get around performance issues discovered in SAMZA-254. A value of -1 
-   * disables this feature.
-   */
-  val deleteCompactionThreshold: Int = -1,
-  val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends 
KeyValueStore[Array[Byte], Array[Byte]] with Logging {
-
-  private lazy val db = factory.open(dir, options)
-  private val lexicographic = new LevelDBLexicographicComparator()
-  private var deletesSinceLastCompaction = 0
-
-  def get(key: Array[Byte]): Array[Byte] = {
-    maybeCompact
-    metrics.gets.inc
-    require(key != null, "Null key not allowed.")
-    val found = db.get(key)
-    if (found != null) {
-      metrics.bytesRead.inc(found.size)
-    }
-    found
-  }
-
-  def put(key: Array[Byte], value: Array[Byte]) {
-    metrics.puts.inc
-    require(key != null, "Null key not allowed.")
-    if (value == null) {
-      db.delete(key)
-      deletesSinceLastCompaction += 1
-    } else {
-      metrics.bytesWritten.inc(key.size + value.size)
-      db.put(key, value)
-    }
-  }
-
-  def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
-    val batch = db.createWriteBatch()
-    val iter = entries.iterator
-    var wrote = 0
-    var deletes = 0
-    while (iter.hasNext) {
-      wrote += 1
-      val curr = iter.next()
-      if (curr.getValue == null) {
-        deletes += 1
-        batch.delete(curr.getKey)
-      } else {
-        val key = curr.getKey
-        val value = curr.getValue
-        metrics.bytesWritten.inc(key.size + value.size)
-        batch.put(key, value)
-      }
-    }
-    db.write(batch)
-    batch.close
-    metrics.puts.inc(wrote)
-    metrics.deletes.inc(deletes)
-    deletesSinceLastCompaction += deletes
-  }
-
-  def delete(key: Array[Byte]) {
-    metrics.deletes.inc
-    put(key, null)
-  }
-
-  def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], 
Array[Byte]] = {
-    maybeCompact
-    metrics.ranges.inc
-    require(from != null && to != null, "Null bound not allowed.")
-    new LevelDbRangeIterator(db.iterator, from, to)
-  }
-
-  def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
-    maybeCompact
-    metrics.alls.inc
-    val iter = db.iterator()
-    iter.seekToFirst()
-    new LevelDbIterator(iter)
-  }
-
-  /**
-   * Trigger a complete compaction on the LevelDB store if there have been at
-   * least deleteCompactionThreshold deletes since the last compaction.
-   */
-  def maybeCompact = {
-    if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= 
deleteCompactionThreshold) {
-      compact
-    }
-  }
-
-  /**
-   * Trigger a complete compaction of the LevelDB store.
-   */
-  def compact {
-    // According to LevelDB's docs:
-    // begin==NULL is treated as a key before all keys in the database.
-    // end==NULL is treated as a key after all keys in the database.
-    db.compactRange(null, null)
-    deletesSinceLastCompaction = 0
-  }
-
-  def flush {
-    metrics.flushes.inc
-    // TODO can't find a flush for leveldb
-    trace("Flushing, but flush in LevelDbKeyValueStore doesn't do anything.")
-  }
-
-  def close() {
-    trace("Closing.")
-
-    db.close()
-  }
-
-  class LevelDbIterator(iter: DBIterator) extends 
KeyValueIterator[Array[Byte], Array[Byte]] {
-    private var open = true
-    def close() = {
-      open = false
-      iter.close()
-    }
-    def remove() = iter.remove()
-    def hasNext() = iter.hasNext()
-    def next() = {
-      if (!hasNext()) {
-        throw new NoSuchElementException
-      }
-
-      val curr = iter.next
-      val key = curr.getKey
-      val value = curr.getValue
-      metrics.bytesRead.inc(key.size)
-      if (value != null) {
-        metrics.bytesRead.inc(value.size)
-      }
-      new Entry(key, value)
-    }
-    override def finalize() {
-      if (open) {
-        System.err.println("Leaked reference to level db iterator, forcing 
close.")
-        close()
-      }
-    }
-  }
-
-  class LevelDbRangeIterator(iter: DBIterator, from: Array[Byte], to: 
Array[Byte]) extends LevelDbIterator(iter) {
-    val comparator = if (options.comparator == null) lexicographic else 
options.comparator
-    iter.seek(from)
-    override def hasNext() = {
-      iter.hasNext() && comparator.compare(iter.peekNext.getKey, to) < 0
-    }
-  }
-
-  /**
-   * Compare two array lexicographically using unsigned byte arithmetic
-   */
-  class LevelDBLexicographicComparator extends LexicographicComparator with 
DBComparator {
-    def name(): String = "lexicographic"
-    def findShortestSeparator(start: Array[Byte], limit: Array[Byte]) = start
-    def findShortSuccessor(key: Array[Byte]) = key
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 1971b1f..84cf6db 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -24,14 +24,14 @@ import scala.collection._
 import java.util.Arrays
 
 /**
- * A write-behind caching layer around the leveldb store. The purpose of this 
cache is three-fold:
- * 1. Batch together writes to leveldb, this turns out to be a great 
optimization
+ * A write-behind caching layer around the rocksdb store. The purpose of this 
cache is three-fold:
+ * 1. Batch together writes to rocksdb, this turns out to be a great 
optimization
  * 2. Avoid duplicate writes and duplicate log entries within a commit 
interval. i.e. if there are two updates to the same key, log only the later.
  * 3. Avoid deserialization cost for gets on very common keys
  *
  * This caching does introduce a few odd corner cases :-(
- * 1. Items in the cache have pass-by-reference semantics but items in leveldb 
have pass-by-value semantics. Modifying items after a put is a bad idea.
- * 2. Range queries require flushing the cache (as the ordering comes from 
leveldb)
+ * 1. Items in the cache have pass-by-reference semantics but items in rocksdb 
have pass-by-value semantics. Modifying items after a put is a bad idea.
+ * 2. Range queries require flushing the cache (as the ordering comes from 
rocksdb)
  *
  * In implementation this cache is just an LRU hash map that discards the 
oldest entry when full. There is an accompanying "dirty list" that references 
keys
  * that have not yet been written to disk. All writes go to the dirty list and 
when the list is long enough we flush out all those values at once. Dirty items

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties 
b/samza-test/src/main/config/perf/kv-perf.properties
index dcc223f..0d487b1 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+stores.test.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.test.compaction.delete.threshold=1000
 test.partition.count=4
 test.num.loops=1000

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index f592d8e..50dfc10 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -25,7 +25,6 @@ import java.util.Random
 
 import org.apache.samza.serializers.Serde
 import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
-import org.iq80.leveldb.Options
 import org.junit.After
 import org.junit.Assert._
 import org.junit.Before
@@ -40,7 +39,7 @@ import scala.collection.mutable.ArrayBuffer
 
 /**
  * Test suite to check different key value store operations
- * @param typeOfStore Defines type of key-value store (Eg: "leveldb" / 
"inmemory")
+ * @param typeOfStore Defines type of key-value store (Eg: "rocksdb" / 
"inmemory")
  * @param storeConfig Defines whether we're using caching / serde / both / or 
none in front of the store
  */
 @RunWith(value = classOf[Parameterized])
@@ -48,7 +47,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   import TestKeyValueStores._
 
   val letters = "abcdefghijklmnopqrstuvwxyz".map(_.toString)
-  val dir = new File(System.getProperty("java.io.tmpdir"), "leveldb-test-" + 
new Random().nextInt(Int.MaxValue))
+  val dir = new File(System.getProperty("java.io.tmpdir"), "rocksdb-test-" + 
new Random().nextInt(Int.MaxValue))
   var store: KeyValueStore[Array[Byte], Array[Byte]] = null
   var cache = false
   var serde = false
@@ -56,9 +55,6 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   @Before
   def setup() {
     val kvStore : KeyValueStore[Array[Byte], Array[Byte]] = typeOfStore match {
-      case "leveldb" =>
-        dir.mkdirs ()
-        new LevelDbKeyValueStore (dir, new Options)
       case "inmemory" =>
         new InMemoryKeyValueStore
       case "rocksdb" =>
@@ -151,7 +147,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
     // out. Our check (below) uses == for cached entries, and using 
     // numEntires >= CacheSize would result in the LRU cache dropping some 
     // entries. The result would be that we get the correct byte array back 
-    // from the cache's underlying store (leveldb), but that == would fail.
+    // from the cache's underlying store (rocksdb), but that == would fail.
     val numEntries = CacheSize - 1
     val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + 
i)))
     store.putAll(entries)
@@ -350,11 +346,6 @@ object TestKeyValueStores {
   val BatchSize = 1000000
   @Parameters
   def parameters: java.util.Collection[Array[String]] = Arrays.asList(
-      //LevelDB
-      Array("leveldb", "cache"),
-      Array("leveldb", "serde"),
-      Array("leveldb", "cache-and-serde"),
-      Array("leveldb", "none"),
       //Inmemory
       Array("inmemory", "cache"),
       Array("inmemory", "serde"),

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 6f0eb21..4c08d6b 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -205,7 +205,7 @@ class TestStatefulTask {
     "task.class" -> "org.apache.samza.test.integration.TestTask",
     "task.inputs" -> "kafka.input",
     "serializers.registry.string.class" -> 
"org.apache.samza.serializers.StringSerdeFactory",
-    "stores.mystore.factory" -> 
"org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
+    "stores.mystore.factory" -> 
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
     "stores.mystore.changelog" -> "kafka.mystoreChangelog",

http://git-wip-us.apache.org/repos/asf/samza/blob/3a3c278a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3a01fd6..bb07a3b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,7 +22,6 @@ include \
   'samza-kafka',
   'samza-kv',
   'samza-kv-inmemory',
-  'samza-kv-leveldb',
   'samza-kv-rocksdb',
   'samza-log4j',
   'samza-shell',

Reply via email to