This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d1d7c8  HDDS-1499. OzoneManager Cache. (#798)
0d1d7c8 is described below

commit 0d1d7c86ec34fabc62c0e3844aca3733024bc172
Author: Bharat Viswanadham <bha...@apache.org>
AuthorDate: Sun May 19 19:23:02 2019 -0700

    HDDS-1499. OzoneManager Cache. (#798)
---
 .../java/org/apache/hadoop/utils/db/DBStore.java   |   1 +
 .../java/org/apache/hadoop/utils/db/RDBTable.java  |  10 +-
 .../java/org/apache/hadoop/utils/db/Table.java     |  26 +++-
 .../org/apache/hadoop/utils/db/TypedTable.java     |  78 ++++++++++-
 .../org/apache/hadoop/utils/db/cache/CacheKey.java |  56 ++++++++
 .../apache/hadoop/utils/db/cache/CacheValue.java   |  47 +++++++
 .../apache/hadoop/utils/db/cache/EpochEntry.java   |  74 +++++++++++
 .../hadoop/utils/db/cache/PartialTableCache.java   |  97 ++++++++++++++
 .../apache/hadoop/utils/db/cache/TableCache.java   |  63 +++++++++
 .../apache/hadoop/utils/db/cache/package-info.java |  18 +++
 .../hadoop/utils/db/TestTypedRDBTableStore.java    |  82 +++++++++++-
 .../utils/db/cache/TestPartialTableCache.java      | 142 +++++++++++++++++++++
 .../apache/hadoop/utils/db/cache/package-info.java |  22 ++++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   4 +-
 14 files changed, 709 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
index 56166ab..9e0c4a4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
@@ -44,6 +44,7 @@ public interface DBStore extends AutoCloseable {
    */
   Table<byte[], byte[]> getTable(String name) throws IOException;
 
+
   /**
    * Gets an existing TableStore with implicit key/value conversion.
    *
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java
index 88b0411..7bbe9d9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.utils.db;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -33,9 +34,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * RocksDB implementation of ozone metadata store.
+ * RocksDB implementation of ozone metadata store. This class should be only
+ * used as part of TypedTable as it's underlying implementation to access the
+ * metadata store content. All other user's using Table should use TypedTable.
  */
-public class RDBTable implements Table<byte[], byte[]> {
+@InterfaceAudience.Private
+class RDBTable implements Table<byte[], byte[]> {
 
 
   private static final Logger LOG =
@@ -52,7 +56,7 @@ public class RDBTable implements Table<byte[], byte[]> {
    * @param handle - ColumnFamily Handle.
    * @param writeOptions - RocksDB write Options.
    */
-  public RDBTable(RocksDB db, ColumnFamilyHandle handle,
+  RDBTable(RocksDB db, ColumnFamilyHandle handle,
       WriteOptions writeOptions) {
     this.db = db;
     this.handle = handle;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
index 2f14e77..905a68b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.utils.db;
 
 import java.io.IOException;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceStability;
-
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
 /**
  * Interface for key-value store that stores ozone metadata. Ozone metadata is
  * stored as key value pairs, both key and value are arbitrary byte arrays. 
Each
@@ -98,6 +100,28 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
   String getName() throws IOException;
 
   /**
+   * Add entry to the table cache.
+   *
+   * If the cacheKey already exists, it will override the entry.
+   * @param cacheKey
+   * @param cacheValue
+   */
+  default void addCacheEntry(CacheKey<KEY> cacheKey,
+      CacheValue<VALUE> cacheValue) {
+    throw new NotImplementedException("addCacheEntry is not implemented");
+  }
+
+  /**
+   * Removes all the entries from the table cache which are having epoch value
+   * less
+   * than or equal to specified epoch value.
+   * @param epoch
+   */
+  default void cleanupCache(long epoch) {
+    throw new NotImplementedException("cleanupCache is not implemented");
+  }
+
+  /**
    * Class used to represent the key and value pair of a db entry.
    */
   interface KeyValue<KEY, VALUE> {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
index 667822b..6de6509 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
@@ -20,6 +20,12 @@ package org.apache.hadoop.utils.db;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+import org.apache.hadoop.utils.db.cache.PartialTableCache;
+import org.apache.hadoop.utils.db.cache.TableCache;
+
 /**
  * Strongly typed table implementation.
  * <p>
@@ -31,13 +37,16 @@ import java.io.IOException;
  */
 public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
-  private Table<byte[], byte[]> rawTable;
+  private final Table<byte[], byte[]> rawTable;
+
+  private final CodecRegistry codecRegistry;
 
-  private CodecRegistry codecRegistry;
+  private final Class<KEY> keyType;
 
-  private Class<KEY> keyType;
+  private final Class<VALUE> valueType;
+
+  private final TableCache<CacheKey<KEY>, CacheValue<VALUE>> cache;
 
-  private Class<VALUE> valueType;
 
   public TypedTable(
       Table<byte[], byte[]> rawTable,
@@ -47,6 +56,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
     this.codecRegistry = codecRegistry;
     this.keyType = keyType;
     this.valueType = valueType;
+    cache = new PartialTableCache<>();
   }
 
   @Override
@@ -69,8 +79,34 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
     return rawTable.isEmpty();
   }
 
+  /**
+   * Returns the value mapped to the given key in byte array or returns null
+   * if the key is not found.
+   *
+   * Caller's of this method should use synchronization mechanism, when
+   * accessing. First it will check from cache, if it has entry return the
+   * value, otherwise get from the RocksDB table.
+   *
+   * @param key metadata key
+   * @return VALUE
+   * @throws IOException
+   */
   @Override
   public VALUE get(KEY key) throws IOException {
+    // Here the metadata lock will guarantee that cache is not updated for same
+    // key during get key.
+    CacheValue< VALUE > cacheValue = cache.get(new CacheKey<>(key));
+    if (cacheValue == null) {
+      // If no cache for the table or if it does not exist in cache get from
+      // RocksDB table.
+      return getFromTable(key);
+    } else {
+      // We have a value in cache, return the value.
+      return cacheValue.getValue();
+    }
+  }
+
+  private VALUE getFromTable(KEY key) throws IOException {
     byte[] keyBytes = codecRegistry.asRawData(key);
     byte[] valueBytes = rawTable.get(keyBytes);
     return codecRegistry.asObject(valueBytes, valueType);
@@ -106,6 +142,40 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
 
   }
 
+  @Override
+  public void addCacheEntry(CacheKey<KEY> cacheKey,
+      CacheValue<VALUE> cacheValue) {
+    // This will override the entry if there is already entry for this key.
+    cache.put(cacheKey, cacheValue);
+  }
+
+
+  @Override
+  public void cleanupCache(long epoch) {
+    cache.cleanup(epoch);
+  }
+
+  @VisibleForTesting
+  TableCache<CacheKey<KEY>, CacheValue<VALUE>> getCache() {
+    return cache;
+  }
+
+  public Table<byte[], byte[]> getRawTable() {
+    return rawTable;
+  }
+
+  public CodecRegistry getCodecRegistry() {
+    return codecRegistry;
+  }
+
+  public Class<KEY> getKeyType() {
+    return keyType;
+  }
+
+  public Class<VALUE> getValueType() {
+    return valueType;
+  }
+
   /**
    * Key value implementation for strongly typed tables.
    */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
new file mode 100644
index 0000000..f928e47
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import java.util.Objects;
+
+/**
+ * CacheKey for the RocksDB table.
+ * @param <KEY>
+ */
+public class CacheKey<KEY> {
+
+  private final KEY key;
+
+  public CacheKey(KEY key) {
+    Objects.requireNonNull(key, "Key Should not be null in CacheKey");
+    this.key = key;
+  }
+
+  public KEY getKey() {
+    return key;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CacheKey<?> cacheKey = (CacheKey<?>) o;
+    return Objects.equals(key, cacheKey.key);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
new file mode 100644
index 0000000..34f77ae
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import com.google.common.base.Optional;
+
+/**
+ * CacheValue for the RocksDB Table.
+ * @param <VALUE>
+ */
+public class CacheValue<VALUE> {
+
+  private Optional<VALUE> value;
+  // This value is used for evict entries from cache.
+  // This value is set with ratis transaction context log entry index.
+  private long epoch;
+
+  public CacheValue(Optional<VALUE> value, long epoch) {
+    this.value = value;
+    this.epoch = epoch;
+  }
+
+  public VALUE getValue() {
+    return value.orNull();
+  }
+
+  public long getEpoch() {
+    return epoch;
+  }
+
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java
new file mode 100644
index 0000000..6966b3d
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/EpochEntry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import java.util.Objects;
+
+/**
+ * Class used which describes epoch entry. This will be used during deletion
+ * entries from cache for partial table cache.
+ * @param <CACHEKEY>
+ */
+public class EpochEntry<CACHEKEY> implements Comparable<CACHEKEY> {
+
+  private long epoch;
+  private CACHEKEY cachekey;
+
+  EpochEntry(long epoch, CACHEKEY cachekey) {
+    this.epoch = epoch;
+    this.cachekey = cachekey;
+  }
+
+  public long getEpoch() {
+    return epoch;
+  }
+
+  public CACHEKEY getCachekey() {
+    return cachekey;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    EpochEntry<?> that = (EpochEntry<?>) o;
+    return epoch == that.epoch && cachekey == that.cachekey;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(epoch, cachekey);
+  }
+
+  public int compareTo(Object o) {
+    if(this.epoch == ((EpochEntry<?>)o).epoch) {
+      return 0;
+    } else if (this.epoch < ((EpochEntry<?>)o).epoch) {
+      return -1;
+    } else {
+      return 1;
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
new file mode 100644
index 0000000..4d37112
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Cache implementation for the table, this cache is partial cache, this will
+ * be cleaned up, after entries are flushed to DB.
+ */
+@Private
+@Evolving
+public class PartialTableCache<CACHEKEY extends CacheKey,
+    CACHEVALUE extends CacheValue> implements TableCache<CACHEKEY, CACHEVALUE> 
{
+
+  private final ConcurrentHashMap<CACHEKEY, CACHEVALUE> cache;
+  private final TreeSet<EpochEntry<CACHEKEY>> epochEntries;
+  private ExecutorService executorService;
+
+
+
+  public PartialTableCache() {
+    cache = new ConcurrentHashMap<>();
+    epochEntries = new TreeSet<>();
+    // Created a singleThreadExecutor, so one cleanup will be running at a
+    // time.
+    ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("PartialTableCache Cleanup Thread - %d").build();
+    executorService = Executors.newSingleThreadExecutor(build);
+
+  }
+
+  @Override
+  public CACHEVALUE get(CACHEKEY cachekey) {
+    return cache.get(cachekey);
+  }
+
+  @Override
+  public void put(CACHEKEY cacheKey, CACHEVALUE value) {
+    cache.put(cacheKey, value);
+    epochEntries.add(new EpochEntry<>(value.getEpoch(), cacheKey));
+  }
+
+  @Override
+  public void cleanup(long epoch) {
+    executorService.submit(() -> evictCache(epoch));
+  }
+
+  @Override
+  public int size() {
+    return cache.size();
+  }
+
+  private void evictCache(long epoch) {
+    EpochEntry<CACHEKEY> currentEntry = null;
+    for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
+         iterator.hasNext();) {
+      currentEntry = iterator.next();
+      CACHEKEY cachekey = currentEntry.getCachekey();
+      CacheValue cacheValue = cache.get(cachekey);
+      if (cacheValue.getEpoch() <= epoch) {
+        cache.remove(cachekey);
+        iterator.remove();
+      } else {
+        // If currentEntry epoch is greater than epoch, we have deleted all
+        // entries less than specified epoch. So, we can break.
+        break;
+      }
+    }
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
new file mode 100644
index 0000000..70e0b33
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Cache used for RocksDB tables.
+ * @param <CACHEKEY>
+ * @param <CACHEVALUE>
+ */
+
+@Private
+@Evolving
+public interface TableCache<CACHEKEY extends CacheKey,
+    CACHEVALUE extends CacheValue> {
+
+  /**
+   * Return the value for the key if it is present, otherwise return null.
+   * @param cacheKey
+   * @return CACHEVALUE
+   */
+  CACHEVALUE get(CACHEKEY cacheKey);
+
+  /**
+   * Add an entry to the cache, if the key already exists it overrides.
+   * @param cacheKey
+   * @param value
+   */
+  void put(CACHEKEY cacheKey, CACHEVALUE value);
+
+  /**
+   * Removes all the entries from the cache which are having epoch value less
+   * than or equal to specified epoch value. For FullTable Cache this is a
+   * do-nothing operation.
+   * @param epoch
+   */
+  void cleanup(long epoch);
+
+  /**
+   * Return the size of the cache.
+   * @return size
+   */
+  int size();
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java
new file mode 100644
index 0000000..8d2506a
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils.db.cache;
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
index 4d3b1bf..adedcaf 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestTypedRDBTableStore.java
@@ -26,10 +26,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.db.Table.KeyValue;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,7 +55,7 @@ public class TestTypedRDBTableStore {
       Arrays.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
           "First", "Second", "Third",
           "Fourth", "Fifth",
-          "Sixth");
+          "Sixth", "Seven");
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
   private RDBStore rdbStore = null;
@@ -236,4 +240,80 @@ public class TestTypedRDBTableStore {
       }
     }
   }
+
+  @Test
+  public void testTypedTableWithCache() throws Exception {
+    int iterCount = 10;
+    try (Table<String, String> testTable = createTypedTable(
+        "Seven")) {
+
+      for (int x = 0; x < iterCount; x++) {
+        String key = Integer.toString(x);
+        String value = Integer.toString(x);
+        testTable.addCacheEntry(new CacheKey<>(key),
+            new CacheValue<>(Optional.of(value),
+            x));
+      }
+
+      // As we have added to cache, so get should return value even if it
+      // does not exist in DB.
+      for (int x = 0; x < iterCount; x++) {
+        Assert.assertEquals(Integer.toString(1),
+            testTable.get(Integer.toString(1)));
+      }
+
+    }
+  }
+
+  @Test
+  public void testTypedTableWithCacheWithFewDeletedOperationType()
+      throws Exception {
+    int iterCount = 10;
+    try (Table<String, String> testTable = createTypedTable(
+        "Seven")) {
+
+      for (int x = 0; x < iterCount; x++) {
+        String key = Integer.toString(x);
+        String value = Integer.toString(x);
+        if (x % 2 == 0) {
+          testTable.addCacheEntry(new CacheKey<>(key),
+              new CacheValue<>(Optional.of(value), x));
+        } else {
+          testTable.addCacheEntry(new CacheKey<>(key),
+              new CacheValue<>(Optional.absent(),
+              x));
+        }
+      }
+
+      // As we have added to cache, so get should return value even if it
+      // does not exist in DB.
+      for (int x = 0; x < iterCount; x++) {
+        if (x % 2 == 0) {
+          Assert.assertEquals(Integer.toString(x),
+              testTable.get(Integer.toString(x)));
+        } else {
+          Assert.assertNull(testTable.get(Integer.toString(x)));
+        }
+      }
+
+      testTable.cleanupCache(5);
+
+      GenericTestUtils.waitFor(() ->
+          ((TypedTable<String, String>) testTable).getCache().size() == 4,
+          100, 5000);
+
+
+      //Check remaining values
+      for (int x = 6; x < iterCount; x++) {
+        if (x % 2 == 0) {
+          Assert.assertEquals(Integer.toString(x),
+              testTable.get(Integer.toString(x)));
+        } else {
+          Assert.assertNull(testTable.get(Integer.toString(x)));
+        }
+      }
+
+
+    }
+  }
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
new file mode 100644
index 0000000..f706659
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db.cache;
+
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Class tests partial table cache.
+ */
+public class TestPartialTableCache {
+  private TableCache<CacheKey<String>, CacheValue<String>> tableCache;
+
+  @Before
+  public void create() {
+    tableCache = new PartialTableCache<>();
+  }
+  @Test
+  public void testPartialTableCache() {
+
+
+    for (int i = 0; i< 10; i++) {
+      tableCache.put(new CacheKey<>(Integer.toString(i)),
+          new CacheValue<>(Optional.of(Integer.toString(i)), i));
+    }
+
+
+    for (int i=0; i < 10; i++) {
+      Assert.assertEquals(Integer.toString(i),
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+    }
+
+    // On a full table cache if some one calls cleanup it is a no-op.
+    tableCache.cleanup(4);
+
+    for (int i=5; i < 10; i++) {
+      Assert.assertEquals(Integer.toString(i),
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+    }
+  }
+
+
+  @Test
+  public void testPartialTableCacheParallel() throws Exception {
+
+    int totalCount = 0;
+    CompletableFuture<Integer> future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return writeToCache(10, 1, 0);
+          } catch (InterruptedException ex) {
+            fail("writeToCache got interrupt exception");
+          }
+          return 0;
+        });
+    int value = future.get();
+    Assert.assertEquals(10, value);
+
+    totalCount += value;
+
+    future =
+        CompletableFuture.supplyAsync(() -> {
+          try {
+            return writeToCache(10, 11, 100);
+          } catch (InterruptedException ex) {
+            fail("writeToCache got interrupt exception");
+          }
+          return 0;
+        });
+
+    // Check we have first 10 entries in cache.
+    for (int i=1; i <= 10; i++) {
+      Assert.assertEquals(Integer.toString(i),
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+    }
+
+    int deleted = 5;
+    // cleanup first 5 entires
+    tableCache.cleanup(deleted);
+
+    value = future.get();
+    Assert.assertEquals(10, value);
+
+    totalCount += value;
+
+    // We should totalCount - deleted entries in cache.
+    final int tc = totalCount;
+    GenericTestUtils.waitFor(() -> (tc - deleted == tableCache.size()), 100,
+        5000);
+
+    // Check if we have remaining entries.
+    for (int i=6; i <= totalCount; i++) {
+      Assert.assertEquals(Integer.toString(i),
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+    }
+
+    tableCache.cleanup(10);
+
+    tableCache.cleanup(totalCount);
+
+    // Cleaned up all entries, so cache size should be zero.
+    GenericTestUtils.waitFor(() -> (0 == tableCache.size()), 100,
+        5000);
+  }
+
+  private int writeToCache(int count, int startVal, long sleep)
+      throws InterruptedException {
+    int counter = 1;
+    while (counter <= count){
+      tableCache.put(new CacheKey<>(Integer.toString(startVal)),
+          new CacheValue<>(Optional.of(Integer.toString(startVal)), startVal));
+      startVal++;
+      counter++;
+      Thread.sleep(sleep);
+    }
+    return count;
+  }
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java
new file mode 100644
index 0000000..b46cf61
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Tests for the DB Cache Utilities.
+ */
+package org.apache.hadoop.utils.db.cache;
\ No newline at end of file
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 793af66..6987927 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -59,6 +59,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRE
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -247,14 +248,13 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager {
     userTable =
         this.store.getTable(USER_TABLE, String.class, VolumeList.class);
     checkTableStatus(userTable, USER_TABLE);
-    this.store.getTable(VOLUME_TABLE, String.class,
-        String.class);
     volumeTable =
         this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
     checkTableStatus(volumeTable, VOLUME_TABLE);
 
     bucketTable =
         this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
+
     checkTableStatus(bucketTable, BUCKET_TABLE);
 
     keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to