rename
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29231238 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29231238 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29231238 Branch: refs/heads/trunk Commit: 29231238073d15e7ab826a75a7a4ea3faf7960cf Parents: 3d4c2f9 Author: Jonathan Ellis <[email protected]> Authored: Fri Mar 22 10:52:30 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Mar 22 10:52:30 2013 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/net/AsyncOneResponse.java | 99 +++++++++++++++ src/java/org/apache/cassandra/net/AsyncResult.java | 99 --------------- .../org/apache/cassandra/net/MessagingService.java | 4 +- .../apache/cassandra/net/ResponseVerbHandler.java | 2 +- .../service/RangeSliceResponseResolver.java | 4 +- .../apache/cassandra/service/RowDataResolver.java | 6 +- .../apache/cassandra/service/StorageService.java | 2 +- .../org/apache/cassandra/utils/FBUtilities.java | 6 +- 8 files changed, 111 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/net/AsyncOneResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/AsyncOneResponse.java b/src/java/org/apache/cassandra/net/AsyncOneResponse.java new file mode 100644 index 0000000..b5370ff --- /dev/null +++ b/src/java/org/apache/cassandra/net/AsyncOneResponse.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A callback specialized for returning a value from a single target; that is, this is for messages + * that we only send to one recipient. + */ +public class AsyncOneResponse<T> implements IAsyncCallback<T> +{ + private T result; + private final AtomicBoolean done = new AtomicBoolean(false); + private final Lock lock = new ReentrantLock(); + private final Condition condition; + private final long startTime; + + public AsyncOneResponse() + { + condition = lock.newCondition(); + startTime = System.currentTimeMillis(); + } + + public T get(long timeout, TimeUnit tu) throws TimeoutException + { + lock.lock(); + try + { + boolean bVal = true; + try + { + if (!done.get()) + { + timeout = TimeUnit.MILLISECONDS.convert(timeout, tu); + long overall_timeout = timeout - (System.currentTimeMillis() - startTime); + bVal = overall_timeout > 0 && condition.await(overall_timeout, TimeUnit.MILLISECONDS); + } + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + + if (!bVal && !done.get()) + { + throw new TimeoutException("Operation timed out."); + } + } + finally + { + lock.unlock(); + } + return result; + } + + public void response(MessageIn<T> response) + { + try + { + lock.lock(); + if (!done.get()) + { + result = response.payload; + done.set(true); + condition.signal(); + } + } + finally + { + lock.unlock(); + } + } + + public boolean isLatencyForSnitch() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/net/AsyncResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/AsyncResult.java b/src/java/org/apache/cassandra/net/AsyncResult.java deleted file mode 100644 index f6e2037..0000000 --- a/src/java/org/apache/cassandra/net/AsyncResult.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * A callback specialized for returning a value from a single target; that is, this is for messages - * that we only send to one recipient. - */ -public class AsyncResult<T> implements IAsyncCallback<T> -{ - private T result; - private final AtomicBoolean done = new AtomicBoolean(false); - private final Lock lock = new ReentrantLock(); - private final Condition condition; - private final long startTime; - - public AsyncResult() - { - condition = lock.newCondition(); - startTime = System.currentTimeMillis(); - } - - public T get(long timeout, TimeUnit tu) throws TimeoutException - { - lock.lock(); - try - { - boolean bVal = true; - try - { - if (!done.get()) - { - timeout = TimeUnit.MILLISECONDS.convert(timeout, tu); - long overall_timeout = timeout - (System.currentTimeMillis() - startTime); - bVal = overall_timeout > 0 && condition.await(overall_timeout, TimeUnit.MILLISECONDS); - } - } - catch (InterruptedException ex) - { - throw new AssertionError(ex); - } - - if (!bVal && !done.get()) - { - throw new TimeoutException("Operation timed out."); - } - } - finally - { - lock.unlock(); - } - return result; - } - - public void response(MessageIn<T> response) - { - try - { - lock.lock(); - if (!done.get()) - { - result = response.payload; - done.set(true); - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - - public boolean isLatencyForSnitch() - { - return false; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 69b640b..f125dcf 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -615,9 +615,9 @@ public final class MessagingService implements MessagingServiceMBean connection.enqueue(processedMessage, id); } - public <T> AsyncResult<T> sendRR(MessageOut message, InetAddress to) + public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to) { - AsyncResult<T> iar = new AsyncResult<T>(); + AsyncOneResponse<T> iar = new AsyncOneResponse<T>(); sendRR(message, to, iar); return iar; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index ce8ab33..8e38218 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -49,7 +49,7 @@ public class ResponseVerbHandler implements IVerbHandler else { Tracing.trace("Processing result from {}", message.from); - ((AsyncResult) cb).response(message); + ((AsyncOneResponse) cb).response(message); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java index e14dd26..ad6a42b 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java +++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java @@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue; import com.google.common.collect.AbstractIterator; import org.apache.cassandra.db.*; -import org.apache.cassandra.net.AsyncResult; +import org.apache.cassandra.net.AsyncOneResponse; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.CloseableIterator; @@ -47,7 +47,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR private final String table; private List<InetAddress> sources; protected final Collection<MessageIn<RangeSliceReply>> responses = new LinkedBlockingQueue<MessageIn<RangeSliceReply>>();; - public final List<AsyncResult> repairResults = new ArrayList<AsyncResult>(); + public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>(); public RangeSliceResponseResolver(String table) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/service/RowDataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java index f992232..1a823e9 100644 --- a/src/java/org/apache/cassandra/service/RowDataResolver.java +++ b/src/java/org/apache/cassandra/service/RowDataResolver.java @@ -37,7 +37,7 @@ import org.apache.cassandra.utils.FBUtilities; public class RowDataResolver extends AbstractRowResolver { private int maxLiveCount = 0; - public List<AsyncResult> repairResults = Collections.emptyList(); + public List<AsyncOneResponse> repairResults = Collections.emptyList(); private final IDiskAtomFilter filter; public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter) @@ -103,9 +103,9 @@ public class RowDataResolver extends AbstractRowResolver * For each row version, compare with resolved (the superset of all row versions); * if it is missing anything, send a mutation to the endpoint it come from. */ - public static List<AsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) + public static List<AsyncOneResponse> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints) { - List<AsyncResult> results = new ArrayList<AsyncResult>(versions.size()); + List<AsyncOneResponse> results = new ArrayList<AsyncOneResponse>(versions.size()); for (int i = 0; i < versions.size(); i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 803fad8..bcfeae6 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1782,7 +1782,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Notifying " + remote.toString() + " of replication completion\n"); while (failureDetector.isAlive(remote)) { - AsyncResult iar = MessagingService.instance().sendRR(msg, remote); + AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote); try { iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/29231238/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index d6e4dbd..6a9eb88 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -52,7 +52,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.IAllocator; -import org.apache.cassandra.net.AsyncResult; +import org.apache.cassandra.net.AsyncOneResponse; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -383,9 +383,9 @@ public class FBUtilities } } - public static void waitOnFutures(List<AsyncResult> results, long ms) throws TimeoutException + public static void waitOnFutures(List<AsyncOneResponse> results, long ms) throws TimeoutException { - for (AsyncResult result : results) + for (AsyncOneResponse result : results) result.get(ms, TimeUnit.MILLISECONDS); }
