Author: orudyy
Date: Thu Nov 3 14:07:44 2016
New Revision: 1767886
URL: http://svn.apache.org/viewvc?rev=1767886&view=rev
Log:
QPID-7477: [Java Broker] Propagate the context subject from the thread that
adds a future callback to the task thread that executes it
Also ensure that future callbacks are always executed by the CO's task executor
by using the explicit three arg form of Futures.addCallback(). Previously some
callbacks were executed by a thread that completed the awaited future and not
necessarily the desired one.
merged from trunk using
svn merge -c r1767825 ^/qpid/java/trunk
Modified:
qpid/java/branches/6.1.x/ (props changed)
qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
Propchange: qpid/java/branches/6.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 3 14:07:44 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514
+/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767825
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
(original)
+++
qpid/java/branches/6.1.x/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
Thu Nov 3 14:07:44 2016
@@ -393,7 +393,7 @@ public class BDBHAVirtualHostNodeImpl ex
final SettableFuture<Void> returnVal = SettableFuture.create();
ListenableFuture<Void> superFuture = super.doStop();
- Futures.addCallback(superFuture, new FutureCallback<Void>()
+ addFutureCallback(superFuture, new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -424,7 +424,7 @@ public class BDBHAVirtualHostNodeImpl ex
}
}
- });
+ }, getTaskExecutor());
return returnVal;
}
@@ -1330,7 +1330,7 @@ public class BDBHAVirtualHostNodeImpl ex
}
});
- Futures.addCallback(future, new FutureCallback<Void>()
+ addFutureCallback(future, new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -1342,7 +1342,7 @@ public class BDBHAVirtualHostNodeImpl ex
{
LOGGER.error("Failed to close children when handling
intruder", t);
}
- });
+ }, getTaskExecutor());
}
private abstract class VirtualHostNodeGroupTask implements Task<Void,
RuntimeException>
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Thu Nov 3 14:07:44 2016
@@ -736,7 +736,7 @@ public abstract class AbstractExchange<T
final SettableFuture<Boolean> returnVal =
SettableFuture.create();
- Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+ addFutureCallback(b.createAsync(), new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
Thu Nov 3 14:07:44 2016
@@ -629,7 +629,7 @@ public abstract class AbstractConfigured
{
try
{
- Futures.addCallback(task.execute(), new FutureCallback<T>()
+ addFutureCallback(task.execute(), new FutureCallback<T>()
{
@Override
public void onSuccess(final T result)
@@ -642,7 +642,7 @@ public abstract class AbstractConfigured
{
returnVal.setException(t);
}
- });
+ }, getTaskExecutor());
}
catch(Throwable t)
{
@@ -700,7 +700,7 @@ public abstract class AbstractConfigured
public void performAction(final ConfiguredObject<?> child)
{
ListenableFuture<Void> childCloseFuture = child.closeAsync();
- Futures.addCallback(childCloseFuture, new
FutureCallback<Void>()
+ addFutureCallback(childCloseFuture, new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -713,7 +713,7 @@ public abstract class AbstractConfigured
LOGGER.error("Exception occurred while closing {} :
{}",
child.getClass().getSimpleName(),
child.getName(), t);
}
- });
+ }, getTaskExecutor());
childCloseFutures.add(childCloseFuture);
}
});
@@ -1025,14 +1025,14 @@ public abstract class AbstractConfigured
ListenableFuture<List<Void>> combinedChildStateFuture =
Futures.allAsList(childStateFutures);
final SettableFuture<Void> returnVal = SettableFuture.create();
- Futures.addCallback(combinedChildStateFuture, new
FutureCallback<List<Void>>()
+ addFutureCallback(combinedChildStateFuture, new
FutureCallback<List<Void>>()
{
@Override
public void onSuccess(final List<Void> result)
{
try
{
- Futures.addCallback(attainState(),
+ addFutureCallback(attainState(),
new FutureCallback<Void>()
{
@Override
@@ -1061,16 +1061,16 @@ public abstract class AbstractConfigured
}
}
}
- }, getTaskExecutor());
+ }, getTaskExecutor());
}
- catch(RuntimeException e)
+ catch (RuntimeException e)
{
try
{
exceptionHandler.handleException(e,
AbstractConfiguredObject.this);
returnVal.set(null);
}
- catch(Throwable t)
+ catch (Throwable t)
{
returnVal.setException(t);
}
@@ -1083,7 +1083,7 @@ public abstract class AbstractConfigured
// One or more children failed to attain state but the error
could not be handled by the handler
returnVal.setException(t);
}
- });
+ }, getTaskExecutor());
return returnVal;
}
@@ -1480,7 +1480,7 @@ public abstract class AbstractConfigured
{
final SettableFuture<Void> stateTransitionResult =
SettableFuture.create();
ListenableFuture<Void> stateTransitionFuture =
(ListenableFuture<Void>) stateChangingMethod.invoke(this);
- Futures.addCallback(stateTransitionFuture, new
FutureCallback<Void>()
+ addFutureCallback(stateTransitionFuture, new
FutureCallback<Void>()
{
@Override
public void onSuccess(Void result)
@@ -1511,7 +1511,7 @@ public abstract class AbstractConfigured
_attainStateFuture.set(AbstractConfiguredObject.this);
stateTransitionResult.setException(t);
}
- });
+ }, getTaskExecutor());
returnVal = stateTransitionResult;
}
catch (IllegalAccessException e)
@@ -2372,7 +2372,7 @@ public abstract class AbstractConfigured
protected static <V> ChainedListenableFuture<Void> doAfter(Executor
executor, ListenableFuture<V> first, final Runnable second)
{
final ChainedSettableFuture<Void> returnVal = new
ChainedSettableFuture<Void>(executor);
- Futures.addCallback(first, new FutureCallback<V>()
+ addFutureCallback(first, new FutureCallback<V>()
{
@Override
public void onSuccess(final V result)
@@ -2464,7 +2464,7 @@ public abstract class AbstractConfigured
protected static <V> ChainedListenableFuture<V> doAfter(final Executor
executor, ListenableFuture<V> first, final Callable<ListenableFuture<V>> second)
{
final ChainedSettableFuture<V> returnVal = new
ChainedSettableFuture<V>(executor);
- Futures.addCallback(first, new FutureCallback<V>()
+ addFutureCallback(first, new FutureCallback<V>()
{
@Override
public void onSuccess(final V result)
@@ -2472,7 +2472,7 @@ public abstract class AbstractConfigured
try
{
final ListenableFuture<V> future = second.call();
- Futures.addCallback(future, new FutureCallback<V>()
+ addFutureCallback(future, new FutureCallback<V>()
{
@Override
public void onSuccess(final V result)
@@ -2508,7 +2508,7 @@ public abstract class AbstractConfigured
protected static <V,A> ChainedListenableFuture<V> doAfter(final Executor
executor, ListenableFuture<A> first, final
CallableWithArgument<ListenableFuture<V>,A> second)
{
final ChainedSettableFuture<V> returnVal = new
ChainedSettableFuture<>(executor);
- Futures.addCallback(first, new FutureCallback<A>()
+ addFutureCallback(first, new FutureCallback<A>()
{
@Override
public void onSuccess(final A result)
@@ -2516,7 +2516,7 @@ public abstract class AbstractConfigured
try
{
final ListenableFuture<V> future = second.call(result);
- Futures.addCallback(future, new FutureCallback<V>()
+ addFutureCallback(future, new FutureCallback<V>()
{
@Override
public void onSuccess(final V result)
@@ -2558,7 +2558,7 @@ public abstract class AbstractConfigured
final
Runnable after)
{
final ChainedSettableFuture<Void> returnVal = new
ChainedSettableFuture<Void>(executor);
- Futures.addCallback(future, new FutureCallback<V>()
+ addFutureCallback(future, new FutureCallback<V>()
{
@Override
public void onSuccess(final V result)
@@ -2591,6 +2591,42 @@ public abstract class AbstractConfigured
return returnVal;
}
+ protected static <V> void addFutureCallback(ListenableFuture<V> future,
final FutureCallback<V> callback,
+ Executor taskExecutor)
+ {
+ final Subject subject =
Subject.getSubject(AccessController.getContext());
+
+ Futures.addCallback(future, new FutureCallback<V>()
+ {
+ @Override
+ public void onSuccess(final V result)
+ {
+ Subject.doAs(subject, new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ callback.onSuccess(result);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ Subject.doAs(subject, new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ callback.onFailure(t);
+ return null;
+ }
+ });
+ }
+ }, taskExecutor);
+ }
@Override
public ListenableFuture<Void> setAttributesAsync(final Map<String, Object>
attributes) throws IllegalStateException, AccessControlException,
IllegalArgumentException
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
Thu Nov 3 14:07:44 2016
@@ -74,7 +74,7 @@ abstract public class AbstractConfigured
final SettableFuture<X> returnVal = SettableFuture.create();
final X instance = createInstance(attributes, parents);
final ListenableFuture<Void> createFuture = instance.createAsync();
- Futures.addCallback(createFuture, new FutureCallback<Void>()
+ AbstractConfiguredObject.addFutureCallback(createFuture, new
FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -87,7 +87,7 @@ abstract public class AbstractConfigured
{
returnVal.setException(t);
}
- },MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
return returnVal;
}
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
Thu Nov 3 14:07:44 2016
@@ -234,7 +234,7 @@ public abstract class AbstractSystemConf
container.setEventLogger(startupLogger);
final SettableFuture<Void> returnVal = SettableFuture.create();
- Futures.addCallback(container.openAsync(), new FutureCallback()
+ addFutureCallback(container.openAsync(), new FutureCallback()
{
@Override
public void onSuccess(final Object result)
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Thu Nov 3 14:07:44 2016
@@ -2020,7 +2020,7 @@ public abstract class AbstractQueue<X ex
ListenableFuture<List<Void>> combinedFuture =
Futures.allAsList(removeBindingFutures);
- Futures.addCallback(combinedFuture, new
FutureCallback<List<Void>>()
+ addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
{
@Override
public void onSuccess(final List<Void> result)
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
Thu Nov 3 14:07:44 2016
@@ -141,7 +141,7 @@ public abstract class AbstractAuthentica
private ListenableFuture<Void> performDelete()
{
final SettableFuture<Void> futureResult = SettableFuture.create();
- Futures.addCallback(closeAsync(), new FutureCallback<Void>()
+ addFutureCallback(closeAsync(), new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -176,7 +176,7 @@ public abstract class AbstractAuthentica
setState(State.DELETED);
_eventLogger.message(AuthenticationProviderMessages.DELETE(getName()));
}
- });
+ }, getTaskExecutor());
return futureResult;
}
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Thu Nov 3 14:07:44 2016
@@ -649,7 +649,7 @@ public abstract class AbstractVirtualHos
attributes.put(Exchange.ID,
UUIDGenerator.generateExchangeUUID(name, getName()));
final ListenableFuture<Exchange<?>> future =
addExchangeAsync(attributes);
final SettableFuture<Void> returnVal = SettableFuture.create();
- Futures.addCallback(future, new FutureCallback<Exchange<?>>()
+ addFutureCallback(future, new FutureCallback<Exchange<?>>()
{
@Override
public void onSuccess(final Exchange<?> result)
@@ -1393,7 +1393,7 @@ public abstract class AbstractVirtualHos
NoFactoryForTypeException
{
final SettableFuture<Exchange<?>> returnVal = SettableFuture.create();
- Futures.addCallback(getObjectFactory().createAsync(Exchange.class,
attributes, this),
+ addFutureCallback(getObjectFactory().createAsync(Exchange.class,
attributes, this),
new FutureCallback<Exchange>()
{
@Override
@@ -1415,7 +1415,7 @@ public abstract class AbstractVirtualHos
returnVal.setException(t);
}
}
- });
+ }, getTaskExecutor());
return returnVal;
}
@@ -2630,7 +2630,7 @@ public abstract class AbstractVirtualHos
final ListenableFuture<Void> childOpenFuture =
child.openAsync();
childOpenFutures.add(childOpenFuture);
- Futures.addCallback(childOpenFuture, new
FutureCallback<Void>()
+ addFutureCallback(childOpenFuture, new
FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -2644,7 +2644,7 @@ public abstract class AbstractVirtualHos
child.getClass().getSimpleName(), child.getName(), t);
}
- });
+ }, getTaskExecutor());
}
});
return null;
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
Thu Nov 3 14:07:44 2016
@@ -164,7 +164,7 @@ public abstract class AbstractVirtualHos
try
{
- Futures.addCallback(activate(),
+ addFutureCallback(activate(),
new FutureCallback<Void>()
{
@Override
@@ -295,12 +295,12 @@ public abstract class AbstractVirtualHos
final SettableFuture<Void> futureResult = SettableFuture.create();
// Delete the node only if deletion of the virtualhost succeeds.
- Futures.addCallback(deleteVirtualHostIfExists(), new
FutureCallback<Void>()
+ addFutureCallback(deleteVirtualHostIfExists(), new
FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
{
- Futures.addCallback(closeAsync(), new FutureCallback<Void>()
+ addFutureCallback(closeAsync(), new FutureCallback<Void>()
{
@Override
public void onSuccess(final Void result)
@@ -339,7 +339,7 @@ public abstract class AbstractVirtualHos
configurationStore.onDelete(AbstractVirtualHostNode.this);
}
}
- });
+ }, getTaskExecutor());
}
@Override
@@ -347,7 +347,7 @@ public abstract class AbstractVirtualHos
{
futureResult.setException(t);
}
- });
+ }, getTaskExecutor());
return futureResult;
}
Modified:
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java?rev=1767886&r1=1767885&r2=1767886&view=diff
==============================================================================
---
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
(original)
+++
qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
Thu Nov 3 14:07:44 2016
@@ -90,7 +90,7 @@ public class RedirectingVirtualHostNodeI
final ListenableFuture<VirtualHost> virtualHostFuture =
getObjectFactory().createAsync(VirtualHost.class, attributes, this);
- Futures.addCallback(virtualHostFuture, new
FutureCallback<VirtualHost>()
+ addFutureCallback(virtualHostFuture, new FutureCallback<VirtualHost>()
{
@Override
public void onSuccess(final VirtualHost virtualHost)
@@ -115,7 +115,7 @@ public class RedirectingVirtualHostNodeI
resultFuture.setException(t);
}
}
- });
+ }, getTaskExecutor());
return resultFuture;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]