Repository: incubator-reef
Updated Branches:
  refs/heads/master 763a6271a -> c41eebde9


[REEF-41] Replace Guava cache usage in core REEF

Add Cache interface to reef-utils and basic CacheImpl.
Cache interface provides get-if-absent-compute and explicit invalidation
and CacheImpl provides an implementation that also includes
expire-after-write timeout.

JIRA:
  REEF-41 https://issues.apache.org/jira/browse/REEF-41

Pull Request:
  This closes #122


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c41eebde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c41eebde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c41eebde

Branch: refs/heads/master
Commit: c41eebde98ddc7d569d140cdff7b4cb600e56c29
Parents: 763a627
Author: Brian Cho <[email protected]>
Authored: Thu Mar 26 17:23:05 2015 +0900
Committer: Markus Weimer <[email protected]>
Committed: Fri Mar 27 09:11:05 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/reef/io/network/Cache.java  |  53 -----
 .../reef/io/network/naming/NameCache.java       |  14 +-
 .../reef/io/network/naming/NameClient.java      |   2 +-
 .../io/network/naming/NameLookupClient.java     |   2 +-
 lang/java/reef-utils/pom.xml                    |   5 +
 .../java/org/apache/reef/util/cache/Cache.java  |  48 +++++
 .../org/apache/reef/util/cache/CacheImpl.java   | 125 ++++++++++++
 .../org/apache/reef/util/cache/CurrentTime.java |  26 +++
 .../org/apache/reef/util/cache/SystemTime.java  |  33 ++++
 .../apache/reef/util/cache/WrappedValue.java    |  92 +++++++++
 .../util/cache/CacheImplConcurrentTest.java     | 198 +++++++++++++++++++
 .../apache/reef/util/cache/CacheImplTest.java   | 131 ++++++++++++
 .../reef/util/cache/ImmediateInteger.java       |  37 ++++
 .../apache/reef/util/cache/SleepingInteger.java |  46 +++++
 .../reef/util/cache/WrappedValueTest.java       |  90 +++++++++
 15 files changed, 839 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
deleted file mode 100644
index 718a576..0000000
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.io.network;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Cache for network and naming services
- */
-public interface Cache<K, V> {
-  /**
-   *  Constructs with timeout
-   *  key is evicted when it's not used for timeout milli-seconds
-   */
-
-  /**
-   * Returns a value for the key if cached; otherwise creates, caches and 
returns
-   * When it creates a value for a key, only one callable for the key is 
executed
-   *
-   * @param key      a key
-   * @param callable a value fetcher
-   * @return a value
-   * @throws NetworkException
-   */
-  public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
-
-  /**
-   * Invalidates a key from the cache
-   *
-   * @param key a key
-   */
-  public void invalidate(K key);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
index 384bfe4..e32b144 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameCache.java
@@ -18,31 +18,29 @@
  */
 package org.apache.reef.io.network.naming;
 
-import com.google.common.cache.CacheBuilder;
-import org.apache.reef.io.network.Cache;
+import org.apache.reef.util.cache.Cache;
+import org.apache.reef.util.cache.CacheImpl;
+import org.apache.reef.util.cache.SystemTime;
 import org.apache.reef.wake.Identifier;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Naming cache implementation
  */
 public class NameCache implements Cache<Identifier, InetSocketAddress> {
 
-  private final com.google.common.cache.Cache<Identifier, InetSocketAddress> 
cache;
+  private final Cache<Identifier, InetSocketAddress> cache;
 
   /**
    * Constructs a naming cache
    *
-   * @param timeout a cache entry timeout after access
+   * @param timeout a cache entry timeout after write
    */
   public NameCache(long timeout) {
-    cache = CacheBuilder.newBuilder()
-        .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
-        .build();
+    cache = new CacheImpl<>(new SystemTime(), timeout);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
index 79b4a92..966ac94 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -19,7 +19,7 @@
 package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.Naming;
-import org.apache.reef.io.network.Cache;
+import org.apache.reef.util.cache.Cache;
 import org.apache.reef.io.network.naming.exception.NamingRuntimeException;
 import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;
 import org.apache.reef.io.network.naming.serialization.NamingMessage;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
index 31b7fca..3a19b99 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -20,7 +20,7 @@ package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.NameAssignment;
 import org.apache.reef.io.naming.NamingLookup;
-import org.apache.reef.io.network.Cache;
+import org.apache.reef.util.cache.Cache;
 import org.apache.reef.io.network.naming.exception.NamingException;
 import org.apache.reef.io.network.naming.serialization.NamingLookupRequest;
 import org.apache.reef.io.network.naming.serialization.NamingLookupResponse;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-utils/pom.xml b/lang/java/reef-utils/pom.xml
index f553be8..6d9c621 100644
--- a/lang/java/reef-utils/pom.xml
+++ b/lang/java/reef-utils/pom.xml
@@ -34,6 +34,11 @@ under the License.
     <!-- This module shouldn't have many dependencies to make sure it is 
broadly usable across reef subprojects -->
     <dependencies>
         <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+            <version>1</version>
+        </dependency>
+        <dependency>
             <groupId>net.jcip</groupId>
             <artifactId>jcip-annotations</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java
new file mode 100644
index 0000000..ef78753
--- /dev/null
+++ b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/Cache.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Cache with get-if-absent-compute semantics.
+ * Supports explicit invalidation.
+ * Implementation may add other features, e.g. eviction on expire-after-write
+ */
+public interface Cache<K, V> {
+  /**
+   * Returns a value for the key if cached; otherwise creates, caches and 
returns
+   * When it creates a value for a key, only one callable for the key is 
executed
+   *
+   * @param key          a key
+   * @param valueFetcher a value fetcher
+   * @return a value
+   * @throws ExecutionException
+   */
+  public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
+
+  /**
+   * Invalidates a key from the cache
+   *
+   * @param key a key
+   */
+  public void invalidate(K key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java
new file mode 100644
index 0000000..778fcb3
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CacheImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implementation that supports expire-after-write.
+ * Entries that have expired are collected and invalidated on get calls.
+ * This obviates the need for a separate thread to invalidate expired entries, 
at
+ * the cost of some increase in get call latency.
+ * The invalidation sweep is only initiated after an interval 
(expireCheckInterval)
+ * has passed, and at most one invalidation sweep is run at a time.
+ *
+ * Operations on a single key are linearizable. The argument is:
+ * 1. The putIfAbsent call in get guarantees that loadAndGet is called exactly 
once
+ *    for a WrappedValue instance that is put into the map: All putIfAbsent 
calls
+ *    that return the WrappedValue instance will return the value loaded by 
loadAndGet.
+ * 2. Concurrent putIfAbsent and remove calls on a key have an ordering: if 
putIfAbsent
+ *    returns null then it happened after the remove (and a new value will be 
loaded);
+ *    else if it returns non-null then it happened before the remove
+ *    (and the previous value will be returned).
+ */
+public final class CacheImpl<K, V> implements Cache<K, V> {
+  private final ConcurrentMap<K, WrappedValue<V>> internalMap;
+  private final CurrentTime currentTime;
+  private final long timeoutMillis;
+  private final long expireCheckInterval;
+  private final AtomicBoolean expireInProgress;
+
+  private long expireCheckedTime;
+
+  /**
+   * Construct an expire-after-write cache
+   *
+   * @param currentTime   class that returns the current time for timeout 
purposes
+   * @param timeoutMillis a cache entry timeout after write
+   */
+  @Inject
+  public CacheImpl(final CurrentTime currentTime,
+                   final long timeoutMillis) {
+    this.internalMap = new ConcurrentHashMap<>();
+    this.currentTime = currentTime;
+    this.timeoutMillis = timeoutMillis;
+    this.expireCheckInterval = timeoutMillis / 2;
+    this.expireInProgress = new AtomicBoolean(false);
+
+    this.expireCheckedTime = currentTime.now();
+  }
+
+  @Override
+  public V get(final K key, final Callable<V> valueFetcher) throws 
ExecutionException {
+    // Before get, try to invalidate as many expired as possible
+    expireEntries();
+
+    final WrappedValue<V> newWrappedValue = new WrappedValue<>(valueFetcher, 
currentTime);
+    final WrappedValue<V> existingWrappedValue = internalMap.putIfAbsent(key, 
newWrappedValue);
+
+    if (existingWrappedValue == null) {
+      // If absent, compute and return
+      return newWrappedValue.loadAndGet();
+    } else {
+      final Optional<V> existingValue = existingWrappedValue.getValue();
+      if (existingValue.isPresent()) {
+        // If value already exists, get (without locking) and return
+        return existingValue.get();
+      } else {
+        // If value is being computed, wait for computation to complete
+        return existingWrappedValue.waitAndGet();
+      }
+    }
+  }
+
+  private void expireEntries() {
+    if (expireInProgress.compareAndSet(false, true)) {
+      final long now = currentTime.now();
+      if (expireCheckedTime + expireCheckInterval < now) {
+        expireEntriesAtTime(now);
+        expireCheckedTime = now;
+      }
+      expireInProgress.compareAndSet(true, false);
+    }
+  }
+
+  private void expireEntriesAtTime(final long now) {
+    for (final K key : internalMap.keySet()) {
+      final WrappedValue<V> wrappedValue = internalMap.get(key);
+      if (wrappedValue != null) {
+        final Optional<Long> writeTime = wrappedValue.getWriteTime();
+        if (writeTime.isPresent() &&
+                writeTime.get() + timeoutMillis < now) {
+          invalidate(key);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void invalidate(final K key) {
+    internalMap.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java
 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java
new file mode 100644
index 0000000..6157210
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/CurrentTime.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+/**
+ * Return the current time
+ */
+public interface CurrentTime {
+  long now();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java
new file mode 100644
index 0000000..c1d19ce
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/SystemTime.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+/**
+ * Return the system time
+ */
+public final class SystemTime implements CurrentTime {
+
+  /**
+   * @return System time in milliseconds
+   */
+  @Override
+  public long now() {
+    return System.currentTimeMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java
 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java
new file mode 100644
index 0000000..abc2a07
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/cache/WrappedValue.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import org.apache.reef.util.Optional;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A representation of a cached entry.
+ * Contains a value and when it was written.
+ */
+final class WrappedValue<V> {
+  private final Callable<V> valueFetcher;
+  private final CurrentTime currentTime;
+
+  private Optional<V> value;
+  private Optional<Long> writeTime;
+
+  /**
+   * Construct a representation of a cached entry.
+   * The value is written after computing valueFetcher and the time
+   * it was written is recorded using currentTime.
+   *
+   * @param valueFetcher method used to fetch the value
+   * @param currentTime  class that returns the current time
+   */
+  public WrappedValue(final Callable<V> valueFetcher,
+                      final CurrentTime currentTime) {
+    this.valueFetcher = valueFetcher;
+    this.currentTime = currentTime;
+
+    this.value = Optional.empty();
+    this.writeTime = Optional.empty();
+  }
+
+  public Optional<Long> getWriteTime() {
+    return writeTime;
+  }
+
+  public Optional<V> getValue() {
+    return value;
+  }
+
+  /**
+   * Must only be called once, by the thread that created this WrappedValue
+   * @return The value returned by valueFetcher
+   */
+  public synchronized V loadAndGet() throws ExecutionException {
+    try {
+      value = Optional.ofNullable(valueFetcher.call());
+    } catch (Exception e) {
+      throw new ExecutionException(e);
+    } finally {
+      writeTime = Optional.of(currentTime.now());
+      this.notifyAll();
+    }
+    if (!value.isPresent()) {
+      throw new ExecutionException("valueFetcher returned null", new 
NullPointerException());
+    } else {
+      return value.get();
+    }
+  }
+
+  public synchronized V waitAndGet() {
+    while (!value.isPresent()) {
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        // Ignore, as while loop will be re-entered
+      }
+    }
+    return value.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java
new file mode 100644
index 0000000..153a526
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplConcurrentTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test concurrent access of CacheImpl
+ */
+public final class CacheImplConcurrentTest {
+
+  private Cache<String, Integer> cache;
+  private final CurrentTime currentTime = new SystemTime();
+  private final long timeoutMillis = 4000;
+  private final long computationMillis = 2000;
+  private final int numConcurrentCalls = 10;
+
+  @Before
+  public void setUp() {
+    cache = new CacheImpl<>(currentTime, timeoutMillis);
+  }
+
+  /**
+   * Test that the value computed on a get is returned for that key
+   * on the first and subsequent concurrent calls.
+   * In particular, for this test the first call takes awhile to compute.
+   */
+  @Test
+  public void testGetReturnsFirstValue() throws ExecutionException, 
InterruptedException {
+    final String key = "testGetReturnsFirstValue";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final ExecutorService es = 
Executors.newFixedThreadPool(numConcurrentCalls);
+    es.submit(new Runnable() {
+      @Override
+      public void run() {
+        final int getFirstValue1;
+        try {
+          // Assert that firstValue is returned, even when other gets are 
called during the Callable execution
+          getFirstValue1 = cache.get(key, new SleepingInteger(firstValue, 
computationMillis));
+          assertEquals(firstValue, getFirstValue1);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    Thread.sleep(500);
+
+    for (int i = 1; i < numConcurrentCalls; i++) {
+      final int index = i;
+      es.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // The original cached value should be retrieved
+            final int getFirstValue2 = cache.get(key, new 
ImmediateInteger(secondValue));
+            assertEquals(firstValue, getFirstValue2);
+          } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    es.shutdown();
+    assertTrue("Tasks should finish before timeout", 
es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
+  }
+
+  /**
+   * Test that the same value computed on a get is returned for that key
+   * on all concurrent calls.
+   * In particular, for this test each thread would have computed a distinct 
value,
+   * but only one thread "comes first" and all other threads return this value.
+   */
+  @Test
+  public void testGetReturnsSameValue() throws InterruptedException {
+    final String key = "testGetReturnsSameValue";
+    final int[] values = new int[numConcurrentCalls];
+    final int[] getValues = new int[numConcurrentCalls];
+    final int nullValue = -1;
+    for (int i = 0; i < numConcurrentCalls; i++) {
+      values[i] = i;
+      getValues[i] = nullValue;
+    }
+
+    final ExecutorService es = 
Executors.newFixedThreadPool(numConcurrentCalls);
+
+    for (int i = 0; i < numConcurrentCalls; i++) {
+      final int index = i;
+      es.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            getValues[index] = cache.get(key, new 
ImmediateInteger(values[index]));
+          } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    es.shutdown();
+    assertTrue("Tasks should finish before timeout", 
es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
+
+    assertNotEquals("The value should be set", nullValue, getValues[0]);
+    for (int i = 1; i < numConcurrentCalls; i++) {
+      assertEquals(getValues[i-1], getValues[i]);
+    }
+  }
+
+  /**
+   * Test that all gets called before an invalidate returns the same value, 
and all
+   * gets called after the invalidate returns a newly computed value.
+   * In particular, for this test the computation for the initial get is still
+   * running while the subsequent gets and invalidate are called.
+   */
+  @Test
+  public void testInvalidateDuringCallableExecution() throws 
ExecutionException, InterruptedException {
+    final String key = "testGet";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final ExecutorService es = Executors.newSingleThreadExecutor();
+    es.submit(new Runnable() {
+      @Override
+      public void run() {
+        final int getFirstValue1;
+        try {
+          // Assert that firstValue is returned, even when it is invalidated 
during the Callable execution
+          getFirstValue1 = cache.get(key, new SleepingInteger(firstValue, 
computationMillis));
+          assertEquals(firstValue, getFirstValue1);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    Thread.sleep(500);
+
+    // In this test, the calls are sequential, but we still run the same 
number of calls
+    final int numSequentialCalls = numConcurrentCalls;
+
+    final int indexToInvalidateOn = numSequentialCalls / 2;
+    for (int i = 1; i < numSequentialCalls; i++) {
+      final int index = i;
+      if (index == indexToInvalidateOn) {
+        cache.invalidate(key);
+      } else if (index < indexToInvalidateOn) {
+        try {
+          // The original cached value should be retrieved, even when it is 
invalidated during the Callable execution
+          final int getFirstValue2 = cache.get(key, new 
ImmediateInteger(secondValue));
+          assertEquals(firstValue, getFirstValue2);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        try {
+          // The second value should be retrieved, because the cache has been 
invalidated
+          final int getFirstValue2 = cache.get(key, new 
ImmediateInteger(secondValue));
+          assertEquals(secondValue, getFirstValue2);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    es.shutdown();
+    assertTrue("Tasks should finish before timeout", 
es.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java
new file mode 100644
index 0000000..bb244cf
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/CacheImplTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test basic access of CacheImpl
+ */
+public final class CacheImplTest {
+
+  private Cache<String, Integer> cache;
+  private final CurrentTime currentTime = new SystemTime();
+  private final long timeoutMillis = 3000;
+
+  @Before
+  public void setUp() {
+    cache = new CacheImpl<>(currentTime, timeoutMillis);
+  }
+
+  /**
+   * Test that an immediate get on the same key returns the cached value 
instead of triggering a new computation
+   */
+  @Test
+  public void testGet() throws ExecutionException, InterruptedException {
+    final String key = "testGet";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final int getFirstValue1 = cache.get(key, new 
ImmediateInteger(firstValue));
+    assertEquals(firstValue, getFirstValue1);
+
+    // The original cached value should be retrieved if called immediately 
(before a timeout)
+    final int getFirstValue2 = cache.get(key, new 
ImmediateInteger(secondValue));
+    assertEquals(firstValue, getFirstValue2);
+
+  }
+
+  /**
+   * Test that an invalidate clears the cached value, so the next access 
triggers a new computation
+   */
+  @Test
+  public void testInvalidate() throws ExecutionException {
+    final String key = "testGet";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final int getValue = cache.get(key, new ImmediateInteger(firstValue));
+    assertEquals(firstValue, getValue);
+
+    cache.invalidate(key);
+
+    // The second cached value should be retrieved after invalidation
+    final int getSecondValue = cache.get(key, new 
ImmediateInteger(secondValue));
+    assertEquals(secondValue, getSecondValue);
+  }
+
+  /**
+   * Test expire-after-write by sleeping beyond the timeout.
+   * Also, the test is designed to fail if the cache is actually 
expire-after-access.
+   */
+  @Test
+  public void testExpireOnWrite() throws ExecutionException, 
InterruptedException {
+    final String key = "testExpireOnWrite";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final int getFirstValue1 = cache.get(key, new 
ImmediateInteger(firstValue));
+    assertEquals(firstValue, getFirstValue1);
+
+    // Sleep less than timeout and do another access; value should be the same
+    Thread.sleep(timeoutMillis/2);
+    final int getFirstValue2 = cache.get(key, new 
ImmediateInteger(firstValue));
+    assertEquals(firstValue, getFirstValue2);
+
+    // Sleep enough to trigger expire-after-write timeout
+    Thread.sleep(timeoutMillis + timeoutMillis/4);
+    // The next cached value should be retrieved after timeout
+    final int getSecondValue = cache.get(key, new 
ImmediateInteger(secondValue));
+    assertEquals(secondValue, getSecondValue);
+  }
+
+  /**
+   * Test expire-after-write is implemented _per-key_.
+   * The test is designed to fail if the cache actually resets the timer on a 
write to a different key.
+   */
+  @Test
+  public void testExpireOnWritePerKey() throws ExecutionException, 
InterruptedException {
+    final String key = "testExpireOnWritePerKey";
+    final String differentKey = "differentKey";
+    final int firstValue = 20;
+    final int secondValue = 40;
+
+    final int getFirstValue = cache.get(key, new ImmediateInteger(firstValue));
+    assertEquals(firstValue, getFirstValue);
+
+    // Sleep less than timeout and do a write on a different key; it should 
not affect
+    // the expiration of the original key
+    Thread.sleep(timeoutMillis/2);
+    final int getFirstValueForDifferentKey = cache.get(differentKey, new 
ImmediateInteger(firstValue));
+    assertEquals(firstValue, getFirstValueForDifferentKey);
+
+    // Sleep enough to trigger timeout
+    Thread.sleep(timeoutMillis + timeoutMillis/4);
+
+    // The next cached value should be retrieved after timeout
+    final int getSecondValue = cache.get(key, new 
ImmediateInteger(secondValue));
+    assertEquals(secondValue, getSecondValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java
new file mode 100644
index 0000000..3b09373
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/ImmediateInteger.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A mock computation that immediately returns an integer
+ */
+class ImmediateInteger implements Callable<Integer> {
+  private final int returnValue;
+
+  public ImmediateInteger(final int returnValue) {
+    this.returnValue = returnValue;
+  }
+
+  @Override
+  public Integer call() throws Exception {
+    return returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java
new file mode 100644
index 0000000..223e123
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/SleepingInteger.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A mock computation that simulates computation time by sleeping and returns 
an integer
+ */
+class SleepingInteger implements Callable<Integer> {
+  private final int returnValue;
+  private final long sleepMillis;
+
+  /**
+   * Construct the mock value fetcher
+   *
+   * @param returnValue value to return
+   * @param sleepMillis amount to sleep
+   */
+  public SleepingInteger(final int returnValue, final long sleepMillis) {
+    this.returnValue = returnValue;
+    this.sleepMillis = sleepMillis;
+  }
+
+  @Override
+  public Integer call() throws Exception {
+    Thread.sleep(sleepMillis);
+    return returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c41eebde/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java
 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java
new file mode 100644
index 0000000..35ff26a
--- /dev/null
+++ 
b/lang/java/reef-utils/src/test/java/org/apache/reef/util/cache/WrappedValueTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.cache;
+
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.*;
+
+public final class WrappedValueTest {
+  private static final CurrentTime systemTime = new SystemTime();
+  private static final int numThreads = 10;
+
+  @Test
+  public void testLoadAndGet() throws ExecutionException {
+    final Integer value = 5;
+    final WrappedValue<Integer> wrappedValue = new WrappedValue<>(new 
ImmediateInteger(value), systemTime);
+
+    assertFalse(wrappedValue.getValue().isPresent());
+
+    final Integer loadedValue = wrappedValue.loadAndGet();
+    assertEquals(value, loadedValue);
+    assertEquals(value, wrappedValue.getValue().get());
+    assertTrue(value == loadedValue);
+  }
+
+  @Test
+  public void testWaitAndGetOnPreviouslyLoadedValue() throws 
ExecutionException {
+    final Integer value = 5;
+    final WrappedValue<Integer> wrappedValue = new WrappedValue<>(new 
ImmediateInteger(value), systemTime);
+    final Integer loadedValue = wrappedValue.loadAndGet();
+    final Integer waitedValue = wrappedValue.waitAndGet();
+
+    assertEquals(value, waitedValue);
+    assertTrue(value == waitedValue);
+
+    assertEquals(loadedValue, waitedValue);
+    assertTrue(loadedValue == waitedValue);
+  }
+
+  @Test
+  public void testConcurrentLoadWaitAndGet() throws ExecutionException, 
InterruptedException {
+    final Integer value = 5;
+    final long sleepMillis = 2000;
+    final ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+
+    final WrappedValue<Integer> wrappedValue = new WrappedValue<>(
+            new SleepingInteger(value, sleepMillis), systemTime);
+    final Integer loadedValue = wrappedValue.loadAndGet();
+
+    final Future<?>[] futures = new Future<?>[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      futures[i] = executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          final Integer valueAfterWait = wrappedValue.waitAndGet();
+          assertEquals(value, valueAfterWait);
+          assertTrue(value == valueAfterWait);
+        }
+      });
+    }
+    for (int i = 0; i < numThreads; i++) {
+      futures[i].get();
+    }
+
+    assertEquals(value, loadedValue);
+    assertTrue(value == wrappedValue.getValue().get());
+    assertTrue(value == loadedValue);
+  }
+}

Reply via email to