Hello, I put together a test to examine what would occur if a local service
submits an RMI (via jeri) to a remote service. This unit test setups up a
remote service (in a child JVM) and then issues an RMI that invokes a {@link
Thread#sleep(long)} method on the remote service. The thread that issues the
RMI is then interrupted during the sleep. The exception when the local thread
is interrupted is provided below.
With reference to the SleepTask below, what I see is on the remote service is:
WARN : 08:03:48,137 2861
com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:498):
Will sleep: millis=6000
…
WARN : 08:03:54,138 8862
com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:501):
Sleep finished normally.
WARN : 08:03:54,139 8863
com.bigdata.journal.jini.ha.HAJournalTest.executorService1
com.bigdata.journal.jini.ha.TestHAJournalServer$SleepTask.call(TestHAJournalServer.java:508):
Did sleep: millis=6000
Thus, it appears that the interrupt of the local thread making the RMI does NOT
interrupt the thread that is executing the behavior on the remote service (in
this case, Thread.sleep()).
A snip of this test is below – it depends on our test harness environment, but
I could isolate it to a river only test if desired (this would be easier if I
had a pointer to a pattern for starting and killing the child process that I
could use for a river test). But first I wanted to see what people's
expectations were for the remote service when the RMI is interrupted. Should
there be an attempt to propagate the interrupt to the method on the remote
service?
Thanks,
Bryan
java.io.IOException: request I/O interrupted
java.rmi.UnmarshalException: exception unmarshalling response; nested exception
is:
java.io.IOException: request I/O interrupted
at
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:847)
at
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethod(BasicInvocationHandler.java:659)
at net.jini.jeri.BasicInvocationHandler.invoke(BasicInvocationHandler.java:528)
at $Proxy2.submit(Unknown Source)
at
com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:399)
at
com.bigdata.journal.jini.ha.TestHAJournalServer$1.call(TestHAJournalServer.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.io.IOException: request I/O interrupted
at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:833)
at
net.jini.jeri.connection.ConnectionManager$Outbound$Input.read(ConnectionManager.java:550)
at net.jini.jeri.BasicObjectEndpoint.executeCall(BasicObjectEndpoint.java:410)
at
net.jini.jeri.BasicInvocationHandler.invokeRemoteMethodOnce(BasicInvocationHandler.java:806)
... 15 more
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at com.sun.jini.jeri.internal.mux.Session$MuxInputStream.read(Session.java:829)
... 18 more
/**
* This test is used to characterize what happens when we interrupt an RMI.
* Most methods on the {@link HAGlue} interface are synchronous - they block
* while some behavior is executed. This is even true for some methods that
* return a {@link Future} in order to avoid overhead associated with the
* export of a proxy and DGC thread leaks (since fixed in River).
* <p>
* This unit test setups up a service and then issues an RMI that invokes a
* {@link Thread#sleep(long)} method on the service. The thread that issues
* the RMI is then interrupted during the sleep.
*
* @throws Exception
*/
public void test_interruptRMI() throws Exception {
// Start a service.
final HAGlue serverA = startA();
final AtomicReference<Throwable> localCause = new
AtomicReference<Throwable>();
final ExecutorService executorService = Executors
.newSingleThreadScheduledExecutor(DaemonThreadFactory
.defaultThreadFactory());
try {
final FutureTask<Void> localFuture = new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
final Future<Void> ft = ((HAGlueTest) serverA)
.submit(new SleepTask(6000/* ms */),
false/* asyncFuture */);
return ft.get();
} catch (Throwable t) {
localCause.set(t);
log.error(t, t);
throw new RuntimeException(t);
} finally {
log.warn("Local submit of remote task is
done.");
}
}
});
/*
* Submit task that will execute sleep on A. This task will block
* until A finishes its sleep. When we cancel this task, the RMI to
* A will be interrupted.
*/
executorService.execute(localFuture);
// Wait a bit to ensure that the task was started on A.
Thread.sleep(2000/* ms */);
// interrupt the local future. will cause interrupt of the RMI.
localFuture.cancel(true/*mayInterruptIfRunning*/);
} finally {
executorService.shutdownNow();
}
/*
* The local root cause of the RMI failure is an InterruptedException.
*
* Note: There is a data race between when the [localCause] is set and
* when we exit the code block above. This is because we are
* interrupting the local task and have no means to await the completion
* of its error handling routine which sets the [localCause].
*/
{
assertCondition(new Runnable() {
@Override
public void run() {
final Throwable tmp = localCause.get();
assertNotNull(tmp);
assertTrue(InnerCause.isInnerCause(tmp,
InterruptedException.class));
}
}, 10000/*timeout*/, TimeUnit.MILLISECONDS);
}
/*
* Verify the root cause as observed by A for the interrupt. It should
* also be an InterruptedException.
*
* Note: Again, there is a data race.
*
* Note: Because we might retry this, we do NOT use the getAndClearXXX()
* method to recover the remote exception.
*/
{
assertCondition(new Runnable() {
@Override
public void run() {
Throwable tmp;
try {
tmp = ((HAGlueTest) serverA).getLastRootCause();
} catch (IOException e) {
throw new RuntimeException(e);
}
assertNotNull(tmp);
log.warn("Received non-null lastRootCause=" + tmp, tmp);
assertTrue(InnerCause.isInnerCause(tmp,
InterruptedException.class));
}
}, 10000/* timeout */, TimeUnit.MILLISECONDS);
}
}
/**
* Task sleeps for a specified duration.
*
* @author <a href="mailto:[email protected]">Bryan
* Thompson</a>
*/
private static class SleepTask extends IndexManagerCallable<Void> {
private static final long serialVersionUID = 1L;
private long millis;
SleepTask(final long millis) {
this.millis = millis;
}
@Override
public Void call() throws Exception {
log.warn("Will sleep: millis=" + millis);
try {
Thread.sleep(millis);
log.warn("Sleep finished normally.");
} catch (Throwable t) {
log.error("Exception during sleep: "+t, t);
((HAJournalTest) getIndexManager()).getRemoteImpl()
.setLastRootCause(t);
throw new RuntimeException(t);
} finally {
log.warn("Did sleep: millis=" + millis);
}
return null;
}
}