Author: jbellis
Date: Wed May 12 00:11:45 2010
New Revision: 943344
URL: http://svn.apache.org/viewvc?rev=943344&view=rev
Log:
replace ExpiringMap in ConsistencyChecker with ScheduledExecutorService
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1077
Removed:
cassandra/trunk/src/java/org/apache/cassandra/cache/ICacheExpungeHook.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=943344&r1=943343&r2=943344&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Wed May 12 00:11:45 2010
@@ -26,10 +26,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.cache.ICacheExpungeHook;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
@@ -40,9 +42,9 @@ import org.apache.cassandra.io.util.Data
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +52,8 @@ import org.slf4j.LoggerFactory;
class ConsistencyChecker implements Runnable
{
private static Logger logger_ =
LoggerFactory.getLogger(ConsistencyChecker.class);
- private static ExpiringMap<String, String> readRepairTable_ = new
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
+
+ private static ScheduledExecutorService executor_ = new
ScheduledThreadPoolExecutor(1); // TODO add JMX
private final String table_;
private final Row row_;
@@ -140,7 +143,7 @@ class ConsistencyChecker implements Runn
}
}
- static class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
+ static class DataRepairHandler implements IAsyncCallback
{
private final Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
private final IResponseResolver<Row> readResponseResolver_;
@@ -172,20 +175,15 @@ class ConsistencyChecker implements Runn
responses_.add(message);
if (responses_.size() == majority_)
{
- String messageId = message.getMessageId();
- readRepairTable_.put(messageId, messageId, this);
- }
- }
-
- public void callMe(String key, String value)
- {
- try
- {
- readResponseResolver_.resolve(responses_);
- }
- catch (Exception ex)
- {
- throw new RuntimeException(ex);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException,
DigestMismatchException
+ {
+ readResponseResolver_.resolve(responses_);
+ }
+ };
+ // give remaining replicas until timeout to reply and get
added to responses_
+ executor_.schedule(runnable,
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=943344&r1=943343&r2=943344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Wed
May 12 00:11:45 2010
@@ -19,15 +19,14 @@
package org.apache.cassandra.utils;
import java.util.*;
-import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cache.ICacheExpungeHook;
-
public class ExpiringMap<K, V>
{
+ private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
+
private class CacheableObject
{
private V value_;
@@ -74,7 +73,6 @@ public class ExpiringMap<K, V>
@Override
public void run()
{
- Map<K, V> expungedValues = new HashMap<K, V>();
synchronized (cache_)
{
Enumeration<K> e = cache_.keys();
@@ -84,37 +82,16 @@ public class ExpiringMap<K, V>
CacheableObject co = cache_.get(key);
if (co != null && co.isReadyToDie(expiration_))
{
- V v = co.getValue();
- if (null != v)
- {
- expungedValues.put(key, v);
- }
cache_.remove(key);
}
}
}
-
- /* Calling the hooks on the keys that have been expunged */
- for (Entry<K, V> entry : expungedValues.entrySet())
- {
- K key = entry.getKey();
- V value = entry.getValue();
-
- ICacheExpungeHook<K, V> hook = hooks_.remove(key);
- if (hook != null)
- {
- hook.callMe(key, value);
- }
- }
- expungedValues.clear();
}
}
private Hashtable<K, CacheableObject> cache_;
- private Map<K, ICacheExpungeHook<K, V>> hooks_;
private Timer timer_;
private static int counter_ = 0;
- private static final Logger LOGGER =
LoggerFactory.getLogger(ExpiringMap.class);
private void init(long expiration)
{
@@ -124,7 +101,6 @@ public class ExpiringMap<K, V>
}
cache_ = new Hashtable<K, CacheableObject>();
- hooks_ = new Hashtable<K, ICacheExpungeHook<K, V>>();
timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true);
timer_.schedule(new CacheMonitor(expiration), expiration, expiration);
}
@@ -148,12 +124,6 @@ public class ExpiringMap<K, V>
cache_.put(key, new CacheableObject(value));
}
- public void put(K key, V value, ICacheExpungeHook<K, V> hook)
- {
- put(key, value);
- hooks_.put(key, hook);
- }
-
public V get(K key)
{
V result = null;