This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch dry-pipes-tcp-guarantees
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/dry-pipes-tcp-guarantees by
this push:
new 55c120b fixed test and performance is as it was before
55c120b is described below
commit 55c120b564b5f75f0a0d8160a3e49297db058cb3
Author: Tibor Digana <[email protected]>
AuthorDate: Sun Jun 27 17:50:38 2021 +0200
fixed test and performance is as it was before
---
.../plugin/surefire/booterclient/ForkStarter.java | 21 ++-
.../extensions/InvalidSessionIdException.java | 2 +-
.../surefire/extensions/LegacyForkChannel.java | 4 +-
.../surefire/extensions/SurefireForkChannel.java | 148 +++++++---------
.../maven/plugin/surefire/extensions/E2ETest.java | 193 ++++++++++++++-------
.../maven/surefire/extensions/ForkChannelTest.java | 2 +-
.../AbstractNoninterruptibleReadableChannel.java | 10 +-
...refireMasterProcessChannelProcessorFactory.java | 2 +-
.../maven/surefire/extensions/ForkChannel.java | 20 ++-
.../extensions/util/CountDownLauncher.java | 5 +-
10 files changed, 243 insertions(+), 164 deletions(-)
diff --git
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index 2cad2a3..5d85463 100644
---
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -636,7 +636,7 @@ public class ForkStarter
new CountdownCloseable( eventConsumer,
forkChannel.getCountdownCloseablePermits() );
try ( CommandlineExecutor exec = new CommandlineExecutor( cli,
countdownCloseable ) )
{
- Completable client = forkChannel.connectToClient();
+ forkChannel.tryConnectToClient();
CommandlineStreams streams = exec.execute();
closer.addCloseable( streams );
@@ -644,13 +644,8 @@ public class ForkStarter
forkChannel.bindEventHandler( eventConsumer, countdownCloseable,
streams.getStdOutChannel() );
- EventHandler<String> errConsumer = new NativeStdErrStreamConsumer(
log );
- LineConsumerThread stdErr = new LineConsumerThread( "fork-" +
forkNumber + "-err-thread",
- streams.getStdErrChannel(), errConsumer, countdownCloseable );
- err = stdErr;
- stdErr.start();
+ err = bindErrorStream( forkNumber, countdownCloseable, streams );
- client.complete();
log.debug( "Fork Channel [" + forkNumber + "] connected to the
client." );
result = exec.awaitExit();
@@ -759,6 +754,18 @@ public class ForkStarter
return runResult;
}
+ private Stoppable bindErrorStream( int forkNumber, CountdownCloseable
countdownCloseable,
+ CommandlineStreams streams )
+ {
+ Stoppable err;
+ EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( log
);
+ LineConsumerThread stdErr = new LineConsumerThread( "fork-" +
forkNumber + "-err-thread",
+ streams.getStdErrChannel(), errConsumer, countdownCloseable );
+ err = stdErr;
+ stdErr.start();
+ return err;
+ }
+
private Iterable<Class<?>> getSuitesIterator()
throws SurefireBooterForkException
{
diff --git
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
index 2ceaf12..cc1b12c 100644
---
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
+++
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
@@ -25,7 +25,7 @@ import
org.apache.maven.surefire.extensions.util.CommandlineExecutor;
import java.io.IOException;
/**
- * After the authentication has failed, {@link ForkChannel#connectToClient()}
throws {@link InvalidSessionIdException}
+ * After the authentication has failed, {@link
ForkChannel#tryConnectToClient()} throws {@link InvalidSessionIdException}
* and {@link org.apache.maven.plugin.surefire.booterclient.ForkStarter}
should close {@link CommandlineExecutor}.
*
* @since 3.0.0-M5
diff --git
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
index 226adaa..b754535 100644
---
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
+++
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
@@ -22,7 +22,6 @@ package org.apache.maven.plugin.surefire.extensions;
import org.apache.maven.surefire.api.event.Event;
import org.apache.maven.surefire.extensions.CloseableDaemonThread;
import org.apache.maven.surefire.extensions.CommandReader;
-import org.apache.maven.surefire.extensions.Completable;
import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.extensions.ForkChannel;
import org.apache.maven.surefire.api.fork.ForkNodeArguments;
@@ -51,9 +50,8 @@ final class LegacyForkChannel extends ForkChannel
}
@Override
- public Completable connectToClient()
+ public void tryConnectToClient()
{
- return Completable.EMPTY_COMPLETABLE;
}
@Override
diff --git
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index b7aeddf..e070679 100644
---
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -24,7 +24,6 @@ import org.apache.maven.surefire.api.event.Event;
import org.apache.maven.surefire.api.fork.ForkNodeArguments;
import org.apache.maven.surefire.extensions.CloseableDaemonThread;
import org.apache.maven.surefire.extensions.CommandReader;
-import org.apache.maven.surefire.extensions.Completable;
import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.extensions.ForkChannel;
import org.apache.maven.surefire.extensions.util.CountDownLauncher;
@@ -41,13 +40,12 @@ import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Future;
import static java.net.StandardSocketOptions.SO_KEEPALIVE;
import static java.net.StandardSocketOptions.SO_REUSEADDR;
@@ -60,11 +58,13 @@ import static
org.apache.maven.surefire.api.util.internal.Channels.newChannel;
import static
org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
import static
org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
import static
org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
+import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
+import static org.apache.maven.surefire.shared.lang3.StringUtils.isNotBlank;
/**
* The TCP/IP server accepting only one client connection. The forked JVM
connects to the server using the
* {@link #getForkNodeConnectionString() connection string}.
- * The main purpose of this class is to {@link #connectToClient() conect with
tthe client}, bind the
+ * The main purpose of this class is to {@link #tryConnectToClient() conect
with tthe client}, bind the
* {@link #bindCommandReader(CommandReader, WritableByteChannel) command
reader} to the internal socket's
* {@link java.io.InputStream}, and bind the
* {@link #bindEventHandler(EventHandler, CountdownCloseable,
ReadableByteChannel) event handler} writing the event
@@ -83,8 +83,8 @@ final class SurefireForkChannel extends ForkChannel
private final String localHost;
private final int localPort;
private final String sessionId;
- private final AtomicReference<AsynchronousSocketChannel> worker = new
AtomicReference<>();
- private final Bindings bindings = new Bindings( 3 );
+ private final Bindings bindings = new Bindings( 2 );
+ private volatile Future<AsynchronousSocketChannel> session;
private volatile LineConsumerThread out;
private volatile CloseableDaemonThread commandReaderBindings;
private volatile CloseableDaemonThread eventHandlerBindings;
@@ -105,21 +105,19 @@ final class SurefireForkChannel extends ForkChannel
}
@Override
- public Completable connectToClient()
+ public void tryConnectToClient()
{
- if ( worker.get() != null )
+ if ( session != null )
{
throw new IllegalStateException( "already accepted TCP client
connection" );
}
- AcceptanceHandler result = new AcceptanceHandler();
- server.accept( null, result );
- return result;
+ session = server.accept();
}
@Override
public String getForkNodeConnectionString()
{
- return "tcp://" + localHost + ":" + localPort + "?sessionId=" +
sessionId;
+ return "tcp://" + localHost + ":" + localPort + ( isBlank( sessionId )
? "" : "?sessionId=" + sessionId );
}
@Override
@@ -130,6 +128,7 @@ final class SurefireForkChannel extends ForkChannel
@Override
public void bindCommandReader( @Nonnull CommandReader commands,
WritableByteChannel stdIn )
+ throws IOException, InterruptedException
{
commandBindings = new CommandBindings( commands );
@@ -140,6 +139,7 @@ final class SurefireForkChannel extends ForkChannel
public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
@Nonnull CountdownCloseable countdown,
ReadableByteChannel stdOut )
+ throws IOException, InterruptedException
{
ForkNodeArguments args = getArguments();
out = new LineConsumerThread( "fork-" + args.getForkChannelId() +
"-out-thread", stdOut,
@@ -169,91 +169,57 @@ final class SurefireForkChannel extends ForkChannel
public void close() throws IOException
{
//noinspection unused,EmptyTryBlock,EmptyTryBlock
- try ( Closeable c1 = worker.get(); Closeable c2 = server; Closeable c3
= out )
+ try ( Closeable c1 = getChannel(); Closeable c2 = server; Closeable c3
= out )
{
// only close all channels
}
- }
-
- @SafeVarargs
- private final void setTrueOptions( SocketOption<Boolean>... options )
- throws IOException
- {
- for ( SocketOption<Boolean> option : options )
+ catch ( InterruptedException e )
{
- if ( server.supportedOptions().contains( option ) )
- {
- server.setOption( option, true );
- }
+ Throwable cause = e.getCause();
+ throw cause instanceof IOException ? (IOException) cause : new
IOException( cause );
}
}
- private final class AcceptanceHandler
- implements CompletionHandler<AsynchronousSocketChannel, Void>,
Completable
+ private void verifySessionId() throws InterruptedException, IOException
{
- private final CountDownLatch acceptanceSynchronizer = new
CountDownLatch( 1 );
- private final CountDownLatch authSynchronizer = new CountDownLatch( 1
);
- private volatile String messageOfIOException;
- private volatile String messageOfInvalidSessionIdException;
-
- @Override
- public void completed( AsynchronousSocketChannel channel, Void
attachment )
+ try
{
- if ( worker.compareAndSet( null, channel ) )
+ ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
+ int read;
+ do
{
- acceptanceSynchronizer.countDown();
- final ByteBuffer buffer = ByteBuffer.allocate(
sessionId.length() );
- channel.read( buffer, null, new CompletionHandler<Integer,
Object>()
- {
- @Override
- public void completed( Integer read, Object attachment )
- {
- if ( read == -1 )
- {
- messageOfIOException = "Channel closed while
verifying the client.";
- }
- ( (Buffer) buffer ).flip();
- String clientSessionId = new String( buffer.array(),
US_ASCII );
- if ( !clientSessionId.equals( sessionId ) )
- {
- messageOfInvalidSessionIdException = "The actual
sessionId '" + clientSessionId
- + "' does not match '" + sessionId + "'.";
- }
- authSynchronizer.countDown();
-
- bindings.countDown();
- }
+ read = getChannel().read( buffer ).get();
+ } while ( read != -1 && buffer.hasRemaining() );
- @Override
- public void failed( Throwable exception, Object attachment
)
- {
- getArguments().dumpStreamException( exception );
- }
- } );
+ if ( read == -1 )
+ {
+ throw new IOException( "Channel closed while verifying the
client." );
}
- else
+
+ ( (Buffer) buffer ).flip();
+ String clientSessionId = new String( buffer.array(), US_ASCII );
+ if ( !clientSessionId.equals( sessionId ) )
{
- getArguments().dumpStreamText( "Another TCP client attempts to
connect." );
+ throw new InvalidSessionIdException( clientSessionId,
sessionId );
}
}
-
- @Override
- public void failed( Throwable exception, Void attachment )
+ catch ( ExecutionException e )
{
- getArguments().dumpStreamException( exception );
- acceptanceSynchronizer.countDown();
- }
-
- @Override
- public void complete() throws IOException, InterruptedException
- {
- completeAcceptance();
- authSynchronizer.await();
+ Throwable cause = e.getCause();
+ throw cause instanceof IOException ? (IOException) cause : new
IOException( cause );
}
+ }
- void completeAcceptance() throws InterruptedException
+ @SafeVarargs
+ private final void setTrueOptions( SocketOption<Boolean>... options )
+ throws IOException
+ {
+ for ( SocketOption<Boolean> option : options )
{
- acceptanceSynchronizer.await();
+ if ( server.supportedOptions().contains( option ) )
+ {
+ server.setOption( option, true );
+ }
}
}
@@ -289,7 +255,7 @@ final class SurefireForkChannel extends ForkChannel
void bindCommandSender( AsynchronousSocketChannel source )
{
- // dont use newBufferedChannel here - may cause the command is not
sent and the JVM hangs
+ // don't use newBufferedChannel here - may cause the command is
not sent and the JVM hangs
// only newChannel flushes the message
// newBufferedChannel does not flush
ForkNodeArguments args = getArguments();
@@ -308,11 +274,29 @@ final class SurefireForkChannel extends ForkChannel
}
@Override
- protected void job()
+ protected void job() throws IOException, InterruptedException
{
- AsynchronousSocketChannel channel = worker.get();
+ AsynchronousSocketChannel channel = getChannel();
+ if ( isNotBlank( sessionId ) )
+ {
+ verifySessionId();
+ }
eventBindings.bindEventHandler( channel );
commandBindings.bindCommandSender( channel );
}
}
+
+ private AsynchronousSocketChannel getChannel()
+ throws InterruptedException, IOException
+ {
+ try
+ {
+ return session == null ? null : session.get();
+ }
+ catch ( ExecutionException e )
+ {
+ Throwable cause = e.getCause();
+ throw cause instanceof IOException ? (IOException) cause : new
IOException( cause );
+ }
+ }
}
diff --git
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
index 60928fb..9f37598 100644
---
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
+++
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
@@ -22,12 +22,14 @@ package org.apache.maven.plugin.surefire.extensions;
import
org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
-import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
+import org.apache.maven.surefire.api.booter.Command;
import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.fork.ForkNodeArguments;
import org.apache.maven.surefire.api.report.ConsoleOutputReceiver;
+import org.apache.maven.surefire.booter.spi.EventChannelEncoder;
import
org.apache.maven.surefire.booter.spi.SurefireMasterProcessChannelProcessorFactory;
+import org.apache.maven.surefire.extensions.CommandReader;
import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.api.fork.ForkNodeArguments;
import org.apache.maven.surefire.extensions.util.CountdownCloseable;
import org.junit.Rule;
import org.junit.Test;
@@ -36,17 +38,20 @@ import org.junit.rules.ExpectedException;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -68,22 +73,70 @@ public class E2ETest
public void endToEndTest() throws Exception
{
ForkNodeArguments arguments = new Arguments(
UUID.randomUUID().toString(), 1, new NullConsoleLogger() );
+
final SurefireForkChannel server = new SurefireForkChannel( arguments
);
+ server.tryConnectToClient();
final String connection = server.getForkNodeConnectionString();
final SurefireMasterProcessChannelProcessorFactory factory = new
SurefireMasterProcessChannelProcessorFactory();
factory.connect( connection );
- final MasterProcessChannelEncoder encoder = factory.createEncoder(
arguments );
-
- System.gc();
-
- TimeUnit.SECONDS.sleep( 3L );
+ final EventChannelEncoder encoder = (EventChannelEncoder)
factory.createEncoder( arguments );
final CountDownLatch awaitHandlerFinished = new CountDownLatch( 2 );
+ final AtomicLong readTime = new AtomicLong();
+
final int totalCalls = 400_000; // 400_000; // 1_000_000; //
10_000_000;
+ EventHandler<Event> h = new EventHandler<Event>()
+ {
+ private final AtomicInteger counter = new AtomicInteger();
+ private volatile long t1;
+
+ @Override
+ public void handleEvent( @Nonnull Event event )
+ {
+ try
+ {
+ if ( counter.getAndIncrement() == 0 )
+ {
+ t1 = System.currentTimeMillis();
+ }
+
+ long t2 = System.currentTimeMillis();
+ long spent = t2 - t1;
+
+ if ( counter.get() == totalCalls - 64 * 1024 )
+ {
+ readTime.set( spent );
+ System.out.println( spent + "ms on read" );
+ awaitHandlerFinished.countDown();
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ EventHandler<Event> queue = /*new EventHandler<Event>() {
+ private int i = 0;
+ @Override
+ public void handleEvent( @Nonnull Event event )
+ {
+ System.out.println("read " + i++ + " messages");
+ }
+ };*/new ThreadedStreamConsumer( h );
+
+ System.gc();
+
+ SECONDS.sleep( 3L );
+
+ server.bindEventHandler( queue, new CountdownCloseable( new
DummyCloseable(), 1 ), new DummyReadableChannel() );
+ server.bindCommandReader( new DummyCommandReader(), null );
+
Thread t = new Thread()
{
@Override
@@ -110,6 +163,12 @@ public class E2ETest
{
//System.out.println( LONG_STRING );
encoder.stdOut( LONG_STRING, true );
+ /*String expected =
":maven-surefire-event:\u0017:std-out-stream-new-line:"
+ + (char) 10 +
":normal-run:\u0005:UTF-8:\u0000\u0000\u0000\u0064:" + LONG_STRING + ":";
+ encoder.write( ByteBuffer.wrap( expected.getBytes(
StandardCharsets.US_ASCII ) ), false );
+ if ( i % 10_000 == 0 ) {
+ System.out.println("zapisovanie " + i);
+ }*/
}
long t2 = System.currentTimeMillis();
long spent = t2 - t1;
@@ -127,57 +186,12 @@ public class E2ETest
t.setDaemon( true );
t.start();
- server.connectToClient();
-
- final AtomicLong readTime = new AtomicLong();
-
- EventHandler<Event> h = new EventHandler<Event>()
- {
- private final AtomicInteger counter = new AtomicInteger();
- private volatile long t1;
-
- @Override
- public void handleEvent( @Nonnull Event event )
- {
- try
- {
- if ( counter.getAndIncrement() == 0 )
- {
- t1 = System.currentTimeMillis();
- }
-
- long t2 = System.currentTimeMillis();
- long spent = t2 - t1;
-
- if ( counter.get() % 500_000 == 0 )
- {
- System.out.println( spent + "ms: " + counter.get() );
- }
-
- if ( counter.get() == totalCalls - 64 * 1024 )
- {
- readTime.set( spent );
- System.out.println( spent + "ms on read" );
- awaitHandlerFinished.countDown();
- }
- }
- catch ( Exception e )
- {
- e.printStackTrace();
- }
- }
- };
-
- ThreadedStreamConsumer queue = new ThreadedStreamConsumer( h );
-
- server.bindEventHandler( queue, new CountdownCloseable( new
DummyCloseable(), 1 ), new DummyReadableChannel() );
-
- assertThat( awaitHandlerFinished.await( 30L, TimeUnit.SECONDS ) )
+ assertThat( awaitHandlerFinished.await( 30L, SECONDS ) )
.isTrue();
factory.close();
server.close();
- queue.close();
+ //queue.close();
// 1.0 seconds while using the encoder/decoder
assertThat( readTime.get() )
@@ -210,8 +224,6 @@ public class E2ETest
t.setDaemon( true );
t.start();
- server.connectToClient();
-
assertThat( task.get() )
.isEqualTo( "client connected" );
}
@@ -247,7 +259,11 @@ public class E2ETest
e.expectMessage( "The actual sessionId
'6ba7b812-9dad-11d1-80b4-00c04fd430c8' does not match '"
+ serverSessionId + "'." );
- server.connectToClient();
+ server.tryConnectToClient();
+ server.bindCommandReader( mock( CommandReader.class ), mock(
WritableByteChannel.class ) );
+
+ server.bindEventHandler( mock( EventHandler.class ),
+ new CountdownCloseable( mock( Closeable.class ), 1 ), mock(
ReadableByteChannel.class ) );
fail( task.get() );
}
@@ -255,21 +271,76 @@ public class E2ETest
private static class DummyReadableChannel implements ReadableByteChannel
{
+ private volatile Thread t;
+
@Override
- public int read( ByteBuffer dst )
+ public int read( ByteBuffer dst ) throws IOException
{
- return 0;
+ try
+ {
+ t = Thread.currentThread();
+ HOURS.sleep( 1L );
+ return 0;
+ }
+ catch ( InterruptedException e )
+ {
+ throw new IOException( e.getLocalizedMessage(), e );
+ }
}
@Override
public boolean isOpen()
{
- return false;
+ return true;
+ }
+
+ @Override
+ public void close()
+ {
+ if ( t != null )
+ {
+ t.interrupt();
+ }
+ }
+ }
+
+ private static class DummyCommandReader implements CommandReader
+ {
+ private volatile Thread t;
+
+ @Override
+ public Command readNextCommand() throws IOException
+ {
+ try
+ {
+ t = Thread.currentThread();
+ HOURS.sleep( 1L );
+ return null;
+ }
+ catch ( InterruptedException e )
+ {
+ throw new IOException( e.getLocalizedMessage(), e );
+ }
}
@Override
public void close()
{
+ if ( t != null )
+ {
+ t.interrupt();
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return false;
+ }
+
+ @Override
+ public void tryFlush()
+ {
}
}
diff --git
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
index b34ecc0..9655c63 100644
---
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
+++
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
@@ -157,7 +157,7 @@ public class ForkChannelTest
Client client = new Client( uri.getPort(), sessionId );
client.start();
- channel.connectToClient();
+ channel.tryConnectToClient();
channel.bindCommandReader( commandReader, null );
ReadableByteChannel stdOut = mock( ReadableByteChannel.class );
when( stdOut.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
diff --git
a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
index 678d643..2b83e0f 100644
---
a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
+++
b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
@@ -63,7 +63,13 @@ abstract class AbstractNoninterruptibleReadableChannel
implements ReadableByteCh
@Override
public final void close() throws IOException
{
- open = false;
- closeImpl();
+ try
+ {
+ closeImpl();
+ }
+ finally
+ {
+ open = false;
+ }
}
}
diff --git
a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
index f6b3e6a..6232209 100644
---
a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
+++
b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
@@ -132,7 +132,7 @@ public class SurefireMasterProcessChannelProcessorFactory
{
if ( clientSocketChannel.supportedOptions().contains( option ) )
{
- // clientSocketChannel.setOption( option, true );
+ clientSocketChannel.setOption( option, true );
}
}
}
diff --git
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
index 26145fb..38d3648 100644
---
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
+++
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
@@ -34,8 +34,9 @@ import java.nio.channels.WritableByteChannel;
* and communicates with a dedicated forked JVM. It represents a server.
* <br>
* <br>
- * It connects to a remote client by {@link #connectToClient()}, provides a
connection string
+ * It connects to a remote client by {@link #tryConnectToClient()}, provides a
connection string
* {@link #getForkNodeConnectionString()} needed by the client in forked JVM,
binds event handler and command reader.
+ * This object is called in one Thread.
*
* @author <a href="mailto:[email protected]">Tibor Digana (tibor17)</a>
* @since 3.0.0-M5
@@ -53,7 +54,13 @@ public abstract class ForkChannel implements Closeable
this.arguments = arguments;
}
- public abstract Completable connectToClient() throws IOException,
InterruptedException;
+ /**
+ * Asynchronously connects to the client.
+ *
+ * @throws IOException if stream fails
+ * @throws InterruptedException if interrupted thread
+ */
+ public abstract void tryConnectToClient() throws IOException,
InterruptedException;
/**
* This is server related class, which if binds to a TCP port, determines
the connection string for the client.
@@ -73,9 +80,10 @@ public abstract class ForkChannel implements Closeable
* @param commands command reader, see {@link
CommandReader#readNextCommand()}
* @param stdIn optional standard input stream of the JVM to write the
encoded commands into it
* @throws IOException if an error in the fork channel
+ * @throws InterruptedException channel interrupted
*/
public abstract void bindCommandReader( @Nonnull CommandReader commands,
WritableByteChannel stdIn )
- throws IOException;
+ throws IOException, InterruptedException;
/**
* Starts a Thread reading the events.
@@ -84,11 +92,12 @@ public abstract class ForkChannel implements Closeable
* @param countdownCloseable count down of the final call of {@link
Closeable#close()}
* @param stdOut optional standard output stream of the JVM
* @throws IOException if an error in the fork channel
+ * @throws InterruptedException channel interrupted
*/
public abstract void bindEventHandler( @Nonnull EventHandler<Event>
eventHandler,
@Nonnull CountdownCloseable
countdownCloseable,
ReadableByteChannel stdOut )
- throws IOException;
+ throws IOException, InterruptedException;
@Nonnull
protected ForkNodeArguments getArguments()
@@ -97,4 +106,7 @@ public abstract class ForkChannel implements Closeable
}
public abstract void disable();
+
+ @Override
+ public abstract void close() throws IOException;
}
diff --git
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
index ec35682..6d0e443 100644
---
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
+++
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
@@ -19,6 +19,7 @@ package org.apache.maven.surefire.extensions.util;
* under the License.
*/
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -41,9 +42,9 @@ public abstract class CountDownLauncher
countDown = new AtomicInteger( count );
}
- protected abstract void job();
+ protected abstract void job() throws IOException, InterruptedException;
- public void countDown()
+ public void countDown() throws IOException, InterruptedException
{
if ( countDown.decrementAndGet() == 0 )
{