Author: rgodfrey
Date: Wed Oct 21 16:29:37 2015
New Revision: 1709878
URL: http://svn.apache.org/viewvc?rev=1709878&view=rev
Log:
QPID-6750 : Simplify the implementation of the futures, avoid using futures
when they are not necessary
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -760,21 +760,36 @@ public abstract class AbstractBDBMessage
*
* @throws org.apache.qpid.server.store.StoreException If the operation
fails for any reason.
*/
- private ListenableFuture<Void> commitTranImpl(final Transaction tx,
boolean syncCommit) throws StoreException
+ private void commitTranImpl(final Transaction tx, boolean syncCommit)
throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is
null at commitTran");
}
- ListenableFuture<Void> result = getEnvironmentFacade().commit(tx,
syncCommit);
+ getEnvironmentFacade().commit(tx, syncCommit);
getLogger().debug("commitTranImpl completed {} transaction {}",
syncCommit ? "synchronous" : "asynchronous", tx);
+
+ }
+
+ private <X> ListenableFuture<X> commitTranAsyncImpl(final Transaction tx,
X val) throws StoreException
+ {
+ if (tx == null)
+ {
+ throw new StoreException("Fatal internal error: transactional is
null at commitTran");
+ }
+
+ ListenableFuture<X> result = getEnvironmentFacade().commitAsync(tx,
val);
+
+ getLogger().debug("commitTranAsynImpl completed transaction {}", tx);
+
return result;
}
+
/**
* Abandons all operations performed within a given transaction.
*
@@ -1148,7 +1163,7 @@ public abstract class AbstractBDBMessage
}
}
- synchronized ListenableFuture<Void> flushToStore()
+ synchronized void flushToStore()
{
if (_messageDataRef != null)
{
@@ -1166,11 +1181,10 @@ public abstract class AbstractBDBMessage
throw
getEnvironmentFacade().handleDatabaseException("failed to begin transaction",
e);
}
store(txn);
- getEnvironmentFacade().commit(txn, true);
+ getEnvironmentFacade().commit(txn, false);
}
}
- return Futures.immediateFuture(null);
}
@Override
@@ -1312,12 +1326,12 @@ public abstract class AbstractBDBMessage
}
@Override
- public ListenableFuture<Void> commitTranAsync() throws StoreException
+ public <X> ListenableFuture<X> commitTranAsync(final X val) throws
StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- ListenableFuture<Void> futureResult =
AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ ListenableFuture<X> futureResult =
AbstractBDBMessageStore.this.commitTranAsyncImpl(_txn, val);
doPostCommitActions();
return futureResult;
}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
Wed Oct 21 16:29:37 2015
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -30,15 +32,10 @@ import java.util.concurrent.atomic.Atomi
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.store.StoreException;
-
public class CoalescingCommiter implements Committer
{
private final CommitThread _commitThread;
@@ -73,98 +70,55 @@ public class CoalescingCommiter implemen
}
@Override
- public ListenableFuture<Void> commit(Transaction tx, boolean syncCommit)
+ public void commit(Transaction tx, boolean syncCommit)
{
- ThreadNotifyingSettableFuture future = new
ThreadNotifyingSettableFuture();
- BDBCommitFutureResult commitFuture = new
BDBCommitFutureResult(_commitThread, tx, syncCommit, future);
- commitFuture.commit();
+ if(syncCommit)
+ {
+ SynchronousCommitThreadJob job = new SynchronousCommitThreadJob();
+ _commitThread.addJob(job, true);
+ job.awaitCompletion();
+ }
+
+ }
+
+ @Override
+ public <X> ListenableFuture<X> commitAsync(Transaction tx, X val)
+ {
+ ThreadNotifyingSettableFuture<X> future = new
ThreadNotifyingSettableFuture<X>();
+ BDBCommitFutureResult<X> commitFuture = new
BDBCommitFutureResult<X>(val, future);
+ _commitThread.addJob(commitFuture, false);
return future;
}
- private static final class BDBCommitFutureResult
+
+ private static final class BDBCommitFutureResult<X> implements
CommitThreadJob
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(BDBCommitFutureResult.class);
+ private final X _value;
+ private final ThreadNotifyingSettableFuture<X> _future;
- private final CommitThread _commitThread;
- private final Transaction _tx;
- private final boolean _syncCommit;
- private final ThreadNotifyingSettableFuture _future;
-
- public BDBCommitFutureResult(CommitThread commitThread,
- Transaction tx,
- boolean syncCommit,
- final ThreadNotifyingSettableFuture
future)
- {
- _commitThread = commitThread;
- _tx = tx;
- _syncCommit = syncCommit;
+ public BDBCommitFutureResult(X value,
+ final ThreadNotifyingSettableFuture<X>
future)
+ {
+ _value = value;
_future = future;
}
public void complete()
{
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("complete() called for transaction " + _tx);
- }
- _future.set(null);
+ _future.set(_value);
}
public void abort(RuntimeException databaseException)
{
_future.setException(databaseException);
}
+ }
- public void commit() throws DatabaseException
- {
- _commitThread.addJob(this, _syncCommit);
-
- if(!_syncCommit)
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("CommitAsync was requested, returning
immediately.");
- }
- return;
- }
-
- boolean interrupted = false;
- try
- {
- while (true)
- {
- try
- {
- _future.get();
- break;
- }
- catch (InterruptedException e)
- {
- interrupted = true;
- }
- }
- if (interrupted)
- {
- Thread.currentThread().interrupt();
- }
+ private interface CommitThreadJob
+ {
+ void complete();
- }
- catch (ExecutionException e)
- {
- if(e.getCause() instanceof RuntimeException)
- {
- throw (RuntimeException)e.getCause();
- }
- else if(e.getCause() instanceof Error)
- {
- throw (Error)e.getCause();
- }
- else
- {
- throw new StoreException(e.getCause());
- }
- }
- }
+ void abort(RuntimeException e);
}
/**
@@ -178,12 +132,15 @@ public class CoalescingCommiter implemen
private static class CommitThread extends Thread
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CommitThread.class);
+ private static final int JOB_QUEUE_NOTIFY_THRESHOLD = 8;
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFutureResult> _jobQueue = new
ConcurrentLinkedQueue<BDBCommitFutureResult>();
+ private final Queue<CommitThreadJob> _jobQueue = new
ConcurrentLinkedQueue<>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
+ private final List<CommitThreadJob> _inProcessJobs = new
ArrayList<>(256);
+
public CommitThread(String name, EnvironmentFacade environmentFacade)
{
super(name);
@@ -210,7 +167,7 @@ public class CoalescingCommiter implemen
{
// Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the
broker hard.
- _lock.wait(1000);
+ _lock.wait(500);
}
catch (InterruptedException e)
{
@@ -223,8 +180,13 @@ public class CoalescingCommiter implemen
private void processJobs()
{
- int size = _jobQueue.size();
+ CommitThreadJob job;
+ while((job = _jobQueue.poll()) != null)
+ {
+ _inProcessJobs.add(job);
+ }
+ int completedJobsIndex = 0;
try
{
long startTime = 0;
@@ -241,14 +203,10 @@ public class CoalescingCommiter implemen
LOGGER.debug("flushLog completed in " + duration + " ms");
}
- for(int i = 0; i < size; i++)
+ while(completedJobsIndex < _inProcessJobs.size())
{
- BDBCommitFutureResult commit = _jobQueue.poll();
- if (commit == null)
- {
- break;
- }
- commit.complete();
+ _inProcessJobs.get(completedJobsIndex).complete();
+ completedJobsIndex++;
}
}
@@ -258,13 +216,9 @@ public class CoalescingCommiter implemen
{
LOGGER.error("Exception during environment log flush", e);
- for(int i = 0; i < size; i++)
+ for(; completedJobsIndex < _inProcessJobs.size();
completedJobsIndex++)
{
- BDBCommitFutureResult commit = _jobQueue.poll();
- if (commit == null)
- {
- break;
- }
+ CommitThreadJob commit =
_inProcessJobs.get(completedJobsIndex);
commit.abort(e);
}
}
@@ -282,6 +236,10 @@ public class CoalescingCommiter implemen
}
}
}
+ finally
+ {
+ _inProcessJobs.clear();
+ }
}
private boolean hasJobs()
@@ -289,14 +247,14 @@ public class CoalescingCommiter implemen
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFutureResult commit, final boolean sync)
+ public void addJob(CommitThreadJob commit, final boolean sync)
{
if (_stopped.get())
{
throw new IllegalStateException("Commit thread is stopped");
}
_jobQueue.add(commit);
- if(sync)
+ if(sync || _jobQueue.size() >= JOB_QUEUE_NOTIFY_THRESHOLD)
{
synchronized (_lock)
{
@@ -310,7 +268,7 @@ public class CoalescingCommiter implemen
synchronized (_lock)
{
_stopped.set(true);
- BDBCommitFutureResult commit;
+ CommitThreadJob commit;
try
{
@@ -340,25 +298,31 @@ public class CoalescingCommiter implemen
}
}
- private final class ThreadNotifyingSettableFuture extends
AbstractFuture<Void>
+ private final class ThreadNotifyingSettableFuture<X> extends
AbstractFuture<X>
{
@Override
- public Void get(final long timeout, final TimeUnit unit)
+ public X get(final long timeout, final TimeUnit unit)
throws InterruptedException, TimeoutException,
ExecutionException
{
- _commitThread.explicitNotify();
+ if(!isDone())
+ {
+ _commitThread.explicitNotify();
+ }
return super.get(timeout, unit);
}
@Override
- public Void get() throws InterruptedException, ExecutionException
+ public X get() throws InterruptedException, ExecutionException
{
- _commitThread.explicitNotify();
+ if(!isDone())
+ {
+ _commitThread.explicitNotify();
+ }
return super.get();
}
@Override
- protected boolean set(final Void value)
+ protected boolean set(final X value)
{
return super.set(value);
}
@@ -376,4 +340,51 @@ public class CoalescingCommiter implemen
_commitThread.explicitNotify();
}
}
+
+ private class SynchronousCommitThreadJob implements CommitThreadJob
+ {
+ private boolean _done;
+ private RuntimeException _exception;
+
+ @Override
+ public synchronized void complete()
+ {
+ _done = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void abort(final RuntimeException e)
+ {
+ _done = true;
+ _exception = e;
+ notifyAll();
+ }
+
+
+ public synchronized void awaitCompletion()
+ {
+ boolean interrupted = false;
+ while(!_done)
+ {
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ }
+ if(interrupted)
+ {
+ Thread.currentThread().interrupt();
+ }
+ if(_exception != null)
+ {
+ throw _exception;
+ }
+ }
+
+ }
}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
Wed Oct 21 16:29:37 2015
@@ -27,7 +27,8 @@ public interface Committer
{
void start();
- ListenableFuture<Void> commit(Transaction tx, boolean syncCommit);
+ void commit(Transaction tx, boolean syncCommit);
+ <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
void stop();
}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -55,7 +55,8 @@ public interface EnvironmentFacade
Transaction beginTransaction(TransactionConfig transactionConfig);
- ListenableFuture<Void> commit(com.sleepycat.je.Transaction tx, boolean
sync);
+ void commit(Transaction tx, boolean sync);
+ <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
RuntimeException handleDatabaseException(String contextMessage,
RuntimeException e);
@@ -69,4 +70,5 @@ public interface EnvironmentFacade
void flushLog();
void setCacheSize(long cacheSize);
+
}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -134,7 +134,7 @@ public class StandardEnvironmentFacade i
}
@Override
- public ListenableFuture<Void> commit(com.sleepycat.je.Transaction tx,
boolean syncCommit)
+ public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
@@ -148,7 +148,25 @@ public class StandardEnvironmentFacade i
throw handleDatabaseException("Got DatabaseException on commit",
de);
}
- return _committer.commit(tx, syncCommit);
+ _committer.commit(tx, syncCommit);
+ }
+
+ @Override
+ public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+ {
+ try
+ {
+ tx.commitNoSync();
+ }
+ catch (DatabaseException de)
+ {
+ LOGGER.error("Got DatabaseException on commit, closing
environment", de);
+
+ closeEnvironmentSafely();
+
+ throw handleDatabaseException("Got DatabaseException on commit",
de);
+ }
+ return _committer.commitAsync(tx, val);
}
@Override
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Wed Oct 21 16:29:37 2015
@@ -271,7 +271,7 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public ListenableFuture<Void> commit(final Transaction tx, boolean
syncCommit)
+ public void commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -287,9 +287,31 @@ public class ReplicatedEnvironmentFacade
if (_coalescingCommiter != null &&
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
&& _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
{
- return _coalescingCommiter.commit(tx, syncCommit);
+ _coalescingCommiter.commit(tx, syncCommit);
}
- return Futures.immediateFuture(null);
+
+ }
+
+ @Override
+ public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X
val)
+ {
+ try
+ {
+ // Using commit() instead of commitNoSync() for the HA store to
allow
+ // the HA durability configuration to influence resulting
behaviour.
+ tx.commit(_realMessageStoreDurability);
+ }
+ catch (DatabaseException de)
+ {
+ throw handleDatabaseException("Got DatabaseException on commit,
closing environment", de);
+ }
+
+ if (_coalescingCommiter != null &&
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ return _coalescingCommiter.commitAsync(tx, val);
+ }
+ return Futures.immediateFuture(val);
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -797,9 +797,9 @@ public abstract class AbstractJDBCMessag
}
}
- private ListenableFuture<Void> commitTranAsync(final ConnectionWrapper
connWrapper) throws StoreException
+ private <X> ListenableFuture<X> commitTranAsync(final ConnectionWrapper
connWrapper, final X val) throws StoreException
{
- final SettableFuture<Void> future = SettableFuture.create();
+ final SettableFuture<X> future = SettableFuture.create();
_executor.submit(new Runnable()
{
@Override
@@ -808,7 +808,7 @@ public abstract class AbstractJDBCMessag
try
{
commitTran(connWrapper);
- future.set(null);
+ future.set(val);
}
catch (RuntimeException e)
{
@@ -1172,11 +1172,11 @@ public abstract class AbstractJDBCMessag
}
@Override
- public ListenableFuture<Void> commitTranAsync()
+ public <X> ListenableFuture<X> commitTranAsync(final X val)
{
checkMessageStoreOpen();
doPreCommitActions();
- ListenableFuture<Void> futureResult =
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
+ ListenableFuture<X> futureResult =
AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper, val);
storedSizeChange(_storeSizeIncrease);
doPostCommitActions();
return futureResult;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -62,9 +62,9 @@ public class MemoryMessageStore implemen
private Set<Xid> _localDistributedTransactionsRemoves = new
HashSet<Xid>();
@Override
- public ListenableFuture<Void> commitTranAsync()
+ public <X> ListenableFuture<X> commitTranAsync(final X val)
{
- return Futures.immediateFuture(null);
+ return Futures.immediateFuture(val);
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
Wed Oct 21 16:29:37 2015
@@ -47,8 +47,9 @@ public interface Transaction
/**
* Commits all operations performed within a given transactional context.
*
+ * @param val
*/
- ListenableFuture<Void> commitTranAsync();
+ <X> ListenableFuture<X> commitTranAsync(final X val);
/**
* Abandons all operations performed within a given transactional context.
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Wed Oct 21 16:29:37 2015
@@ -101,7 +101,7 @@ public class AsyncAutoCommitTransaction
txn = _messageStore.newTransaction();
txn.dequeueMessage(record);
- future = txn.commitTranAsync();
+ future = txn.commitTranAsync((Void) null);
txn = null;
}
@@ -177,7 +177,7 @@ public class AsyncAutoCommitTransaction
ListenableFuture<Void> future;
if(txn != null)
{
- future = txn.commitTranAsync();
+ future = txn.commitTranAsync((Void) null);
txn = null;
}
else
@@ -208,7 +208,7 @@ public class AsyncAutoCommitTransaction
txn = _messageStore.newTransaction();
enqueueRecord = txn.enqueueMessage(queue, message);
- future = txn.commitTranAsync();
+ future = txn.commitTranAsync((Void) null);
txn = null;
}
else
@@ -286,7 +286,7 @@ public class AsyncAutoCommitTransaction
ListenableFuture<Void> future;
if (txn != null)
{
- future = txn.commitTranAsync();
+ future = txn.commitTranAsync((Void) null);
txn = null;
}
else
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Wed Oct 21 16:29:37 2015
@@ -26,15 +26,9 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +58,7 @@ public class LocalTransaction implements
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private ListenableFuture<Void> _asyncTran;
+ private ListenableFuture<Runnable> _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -384,50 +378,25 @@ public class LocalTransaction implements
sync();
if(_transaction != null)
{
- final ListenableFuture<Void> underlying =
_transaction.commitTranAsync();
- /*
- Note that this future is not a general purpose future and makes
assumptions about the fact that get() is
- only called once (which is enforced by how sync() works. The
post transaction actions must be performed
- in the connection thread (i.e. the thread that the sync() is
called from - not the commit thread which is
- where the actions would occur if we added a listener to the
underlying future
- */
- _asyncTran = new ForwardingListenableFuture<Void>()
- {
-
- @Override
- protected ListenableFuture<Void> delegate()
- {
- return underlying;
- }
-
- @Override
- public Void get(final long timeout, final TimeUnit unit)
- throws InterruptedException, TimeoutException,
ExecutionException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Void get() throws InterruptedException,
ExecutionException
- {
- final Void rval;
- try
- {
- rval = super.get();
- doPostTransactionActions();
- deferred.run();
- }
- finally
- {
- resetDetails();
- }
- return rval;
- }
-
-
-
- };
+ Runnable action = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ doPostTransactionActions();
+ deferred.run();
+ }
+ finally
+ {
+ resetDetails();
+ }
+
+ }
+ };
+ _asyncTran = _transaction.commitTranAsync(action);
}
else
@@ -491,7 +460,7 @@ public class LocalTransaction implements
{
try
{
- _asyncTran.get();
+ _asyncTran.get().run();
break;
}
catch (InterruptedException e)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Wed Oct 21 16:29:37 2015
@@ -466,7 +466,7 @@ public class AsynchronousMessageStoreRec
+ " is unknown, entry will be discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(record);
- txn.commitTranAsync();
+ txn.commitTranAsync((Void) null);
}
return _continueRecovery.get();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
Wed Oct 21 16:29:37 2015
@@ -207,7 +207,7 @@ public class SynchronousMessageStoreReco
_logger.warn("Message id " + messageId + " referenced in
log as enqueued in queue " + queueName + " is unknown, entry will be
discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(record);
- txn.commitTranAsync();
+ txn.commitTranAsync((Void) null);
}
}
else
@@ -215,7 +215,7 @@ public class SynchronousMessageStoreReco
_logger.warn("Message id " + messageId + " in log references
queue with id " + queueId + " which is not in the configuration, entry will be
discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(record);
- txn.commitTranAsync();
+ txn.commitTranAsync((Void) null);
}
return true;
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
Wed Oct 21 16:29:37 2015
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.*;
import java.util.Collections;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -54,7 +53,7 @@ public class AsyncAutoCommitTransactionT
super.setUp();
when(_messageStore.newTransaction()).thenReturn(_storeTransaction);
- when(_storeTransaction.commitTranAsync()).thenReturn(_future);
+ when(_storeTransaction.commitTranAsync((Void)
null)).thenReturn(_future);
when(_queue.isDurable()).thenReturn(true);
when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
Wed Oct 21 16:29:37 2015
@@ -103,9 +103,9 @@ class MockStoreTransaction implements Tr
_state = TransactionState.COMMITTED;
}
- public ListenableFuture<Void> commitTranAsync()
+ public <X> ListenableFuture<X> commitTranAsync(final X val)
{
- return Futures.immediateFuture(null);
+ return Futures.immediateFuture(val);
}
public void abortTran()
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
Wed Oct 21 16:29:37 2015
@@ -144,7 +144,7 @@ public class SynchronousMessageStoreReco
verify(queue, never()).enqueue(any(ServerMessage.class),
any(Action.class), any(MessageEnqueueRecord.class));
verify(transaction).dequeueMessage(argThat(new
MessageEnqueueRecordMatcher(queue.getId(), messageId)));
- verify(transaction, times(1)).commitTranAsync();
+ verify(transaction, times(1)).commitTranAsync((Void) null);
}
public void testRecoveryOfMessageInstanceForNonExistingQueue()
@@ -182,7 +182,7 @@ public class SynchronousMessageStoreReco
recoverer.recover(_virtualHost);
verify(transaction).dequeueMessage(argThat(new
MessageEnqueueRecordMatcher(queueId,messageId)));
- verify(transaction, times(1)).commitTranAsync();
+ verify(transaction, times(1)).commitTranAsync((Void) null);
}
public void testRecoveryDeletesOrphanMessages()
Modified:
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java?rev=1709878&r1=1709877&r2=1709878&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
Wed Oct 21 16:29:37 2015
@@ -133,11 +133,11 @@ public abstract class GenericAbstractJDB
}
@Override
- public ListenableFuture<Void> commitTranAsync()
+ public <X> ListenableFuture<X> commitTranAsync(final X val)
{
try
{
- return super.commitTranAsync();
+ return super.commitTranAsync(val);
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]