This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch dry-pipes-tcp-guarantees
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/dry-pipes-tcp-guarantees by 
this push:
     new 55c120b  fixed test and performance is as it was before
55c120b is described below

commit 55c120b564b5f75f0a0d8160a3e49297db058cb3
Author: Tibor Digana <[email protected]>
AuthorDate: Sun Jun 27 17:50:38 2021 +0200

    fixed test and performance is as it was before
---
 .../plugin/surefire/booterclient/ForkStarter.java  |  21 ++-
 .../extensions/InvalidSessionIdException.java      |   2 +-
 .../surefire/extensions/LegacyForkChannel.java     |   4 +-
 .../surefire/extensions/SurefireForkChannel.java   | 148 +++++++---------
 .../maven/plugin/surefire/extensions/E2ETest.java  | 193 ++++++++++++++-------
 .../maven/surefire/extensions/ForkChannelTest.java |   2 +-
 .../AbstractNoninterruptibleReadableChannel.java   |  10 +-
 ...refireMasterProcessChannelProcessorFactory.java |   2 +-
 .../maven/surefire/extensions/ForkChannel.java     |  20 ++-
 .../extensions/util/CountDownLauncher.java         |   5 +-
 10 files changed, 243 insertions(+), 164 deletions(-)

diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index 2cad2a3..5d85463 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -636,7 +636,7 @@ public class ForkStarter
             new CountdownCloseable( eventConsumer, 
forkChannel.getCountdownCloseablePermits() );
         try ( CommandlineExecutor exec = new CommandlineExecutor( cli, 
countdownCloseable ) )
         {
-            Completable client = forkChannel.connectToClient();
+            forkChannel.tryConnectToClient();
             CommandlineStreams streams = exec.execute();
             closer.addCloseable( streams );
 
@@ -644,13 +644,8 @@ public class ForkStarter
 
             forkChannel.bindEventHandler( eventConsumer, countdownCloseable, 
streams.getStdOutChannel() );
 
-            EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( 
log );
-            LineConsumerThread stdErr = new LineConsumerThread( "fork-" + 
forkNumber + "-err-thread",
-                streams.getStdErrChannel(), errConsumer, countdownCloseable );
-            err = stdErr;
-            stdErr.start();
+            err = bindErrorStream( forkNumber, countdownCloseable, streams );
 
-            client.complete();
             log.debug( "Fork Channel [" + forkNumber + "] connected to the 
client." );
 
             result = exec.awaitExit();
@@ -759,6 +754,18 @@ public class ForkStarter
         return runResult;
     }
 
+    private Stoppable bindErrorStream( int forkNumber, CountdownCloseable 
countdownCloseable,
+                                       CommandlineStreams streams )
+    {
+        Stoppable err;
+        EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( log 
);
+        LineConsumerThread stdErr = new LineConsumerThread( "fork-" + 
forkNumber + "-err-thread",
+            streams.getStdErrChannel(), errConsumer, countdownCloseable );
+        err = stdErr;
+        stdErr.start();
+        return err;
+    }
+
     private Iterable<Class<?>> getSuitesIterator()
         throws SurefireBooterForkException
     {
diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
index 2ceaf12..cc1b12c 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/InvalidSessionIdException.java
@@ -25,7 +25,7 @@ import 
org.apache.maven.surefire.extensions.util.CommandlineExecutor;
 import java.io.IOException;
 
 /**
- * After the authentication has failed, {@link ForkChannel#connectToClient()} 
throws {@link InvalidSessionIdException}
+ * After the authentication has failed, {@link 
ForkChannel#tryConnectToClient()} throws {@link InvalidSessionIdException}
  * and {@link org.apache.maven.plugin.surefire.booterclient.ForkStarter} 
should close {@link CommandlineExecutor}.
  *
  * @since 3.0.0-M5
diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
index 226adaa..b754535 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
@@ -22,7 +22,6 @@ package org.apache.maven.plugin.surefire.extensions;
 import org.apache.maven.surefire.api.event.Event;
 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
 import org.apache.maven.surefire.extensions.CommandReader;
-import org.apache.maven.surefire.extensions.Completable;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
@@ -51,9 +50,8 @@ final class LegacyForkChannel extends ForkChannel
     }
 
     @Override
-    public Completable connectToClient()
+    public void tryConnectToClient()
     {
-        return Completable.EMPTY_COMPLETABLE;
     }
 
     @Override
diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index b7aeddf..e070679 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -24,7 +24,6 @@ import org.apache.maven.surefire.api.event.Event;
 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
 import org.apache.maven.surefire.extensions.CommandReader;
-import org.apache.maven.surefire.extensions.Completable;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.extensions.util.CountDownLauncher;
@@ -41,13 +40,12 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousServerSocketChannel;
 import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.CompletionHandler;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Future;
 
 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
 import static java.net.StandardSocketOptions.SO_REUSEADDR;
@@ -60,11 +58,13 @@ import static 
org.apache.maven.surefire.api.util.internal.Channels.newChannel;
 import static 
org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
 import static 
org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
 import static 
org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
+import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
+import static org.apache.maven.surefire.shared.lang3.StringUtils.isNotBlank;
 
 /**
  * The TCP/IP server accepting only one client connection. The forked JVM 
connects to the server using the
  * {@link #getForkNodeConnectionString() connection string}.
- * The main purpose of this class is to {@link #connectToClient() conect with 
tthe client}, bind the
+ * The main purpose of this class is to {@link #tryConnectToClient() conect 
with tthe client}, bind the
  * {@link #bindCommandReader(CommandReader, WritableByteChannel) command 
reader} to the internal socket's
  * {@link java.io.InputStream}, and bind the
  * {@link #bindEventHandler(EventHandler, CountdownCloseable, 
ReadableByteChannel) event handler} writing the event
@@ -83,8 +83,8 @@ final class SurefireForkChannel extends ForkChannel
     private final String localHost;
     private final int localPort;
     private final String sessionId;
-    private final AtomicReference<AsynchronousSocketChannel> worker = new 
AtomicReference<>();
-    private final Bindings bindings = new Bindings( 3 );
+    private final Bindings bindings = new Bindings( 2 );
+    private volatile Future<AsynchronousSocketChannel> session;
     private volatile LineConsumerThread out;
     private volatile CloseableDaemonThread commandReaderBindings;
     private volatile CloseableDaemonThread eventHandlerBindings;
@@ -105,21 +105,19 @@ final class SurefireForkChannel extends ForkChannel
     }
 
     @Override
-    public Completable connectToClient()
+    public void tryConnectToClient()
     {
-        if ( worker.get() != null )
+        if ( session != null )
         {
             throw new IllegalStateException( "already accepted TCP client 
connection" );
         }
-        AcceptanceHandler result = new AcceptanceHandler();
-        server.accept( null, result );
-        return result;
+        session = server.accept();
     }
 
     @Override
     public String getForkNodeConnectionString()
     {
-        return "tcp://" + localHost + ":" + localPort + "?sessionId=" + 
sessionId;
+        return "tcp://" + localHost + ":" + localPort + ( isBlank( sessionId ) 
? "" : "?sessionId=" + sessionId );
     }
 
     @Override
@@ -130,6 +128,7 @@ final class SurefireForkChannel extends ForkChannel
 
     @Override
     public void bindCommandReader( @Nonnull CommandReader commands, 
WritableByteChannel stdIn )
+        throws IOException, InterruptedException
     {
         commandBindings = new CommandBindings( commands );
 
@@ -140,6 +139,7 @@ final class SurefireForkChannel extends ForkChannel
     public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
                                   @Nonnull CountdownCloseable countdown,
                                   ReadableByteChannel stdOut )
+        throws IOException, InterruptedException
     {
         ForkNodeArguments args = getArguments();
         out = new LineConsumerThread( "fork-" + args.getForkChannelId() + 
"-out-thread", stdOut,
@@ -169,91 +169,57 @@ final class SurefireForkChannel extends ForkChannel
     public void close() throws IOException
     {
         //noinspection unused,EmptyTryBlock,EmptyTryBlock
-        try ( Closeable c1 = worker.get(); Closeable c2 = server; Closeable c3 
= out )
+        try ( Closeable c1 = getChannel(); Closeable c2 = server; Closeable c3 
= out )
         {
             // only close all channels
         }
-    }
-
-    @SafeVarargs
-    private final void setTrueOptions( SocketOption<Boolean>... options )
-        throws IOException
-    {
-        for ( SocketOption<Boolean> option : options )
+        catch ( InterruptedException e )
         {
-            if ( server.supportedOptions().contains( option ) )
-            {
-                server.setOption( option, true );
-            }
+            Throwable cause = e.getCause();
+            throw cause instanceof IOException ? (IOException) cause : new 
IOException( cause );
         }
     }
 
-    private final class AcceptanceHandler
-        implements CompletionHandler<AsynchronousSocketChannel, Void>, 
Completable
+    private void verifySessionId() throws InterruptedException, IOException
     {
-        private final CountDownLatch acceptanceSynchronizer = new 
CountDownLatch( 1 );
-        private final CountDownLatch authSynchronizer = new CountDownLatch( 1 
);
-        private volatile String messageOfIOException;
-        private volatile String messageOfInvalidSessionIdException;
-
-        @Override
-        public void completed( AsynchronousSocketChannel channel, Void 
attachment )
+        try
         {
-            if ( worker.compareAndSet( null, channel ) )
+            ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
+            int read;
+            do
             {
-                acceptanceSynchronizer.countDown();
-                final ByteBuffer buffer = ByteBuffer.allocate( 
sessionId.length() );
-                channel.read( buffer, null, new CompletionHandler<Integer, 
Object>()
-                {
-                    @Override
-                    public void completed( Integer read, Object attachment )
-                    {
-                        if ( read == -1 )
-                        {
-                            messageOfIOException = "Channel closed while 
verifying the client.";
-                        }
-                        ( (Buffer) buffer ).flip();
-                        String clientSessionId = new String( buffer.array(), 
US_ASCII );
-                        if ( !clientSessionId.equals( sessionId ) )
-                        {
-                            messageOfInvalidSessionIdException = "The actual 
sessionId '" + clientSessionId
-                                + "' does not match '" + sessionId + "'.";
-                        }
-                        authSynchronizer.countDown();
-
-                        bindings.countDown();
-                    }
+                read = getChannel().read( buffer ).get();
+            } while ( read != -1 && buffer.hasRemaining() );
 
-                    @Override
-                    public void failed( Throwable exception, Object attachment 
)
-                    {
-                        getArguments().dumpStreamException( exception );
-                    }
-                } );
+            if ( read == -1 )
+            {
+                throw new IOException( "Channel closed while verifying the 
client." );
             }
-            else
+
+            ( (Buffer) buffer ).flip();
+            String clientSessionId = new String( buffer.array(), US_ASCII );
+            if ( !clientSessionId.equals( sessionId ) )
             {
-                getArguments().dumpStreamText( "Another TCP client attempts to 
connect." );
+                throw new InvalidSessionIdException( clientSessionId, 
sessionId );
             }
         }
-
-        @Override
-        public void failed( Throwable exception, Void attachment )
+        catch ( ExecutionException e )
         {
-            getArguments().dumpStreamException( exception );
-            acceptanceSynchronizer.countDown();
-        }
-
-        @Override
-        public void complete() throws IOException, InterruptedException
-        {
-            completeAcceptance();
-            authSynchronizer.await();
+            Throwable cause = e.getCause();
+            throw cause instanceof IOException ? (IOException) cause : new 
IOException( cause );
         }
+    }
 
-        void completeAcceptance() throws InterruptedException
+    @SafeVarargs
+    private final void setTrueOptions( SocketOption<Boolean>... options )
+        throws IOException
+    {
+        for ( SocketOption<Boolean> option : options )
         {
-            acceptanceSynchronizer.await();
+            if ( server.supportedOptions().contains( option ) )
+            {
+                server.setOption( option, true );
+            }
         }
     }
 
@@ -289,7 +255,7 @@ final class SurefireForkChannel extends ForkChannel
 
         void bindCommandSender( AsynchronousSocketChannel source )
         {
-            // dont use newBufferedChannel here - may cause the command is not 
sent and the JVM hangs
+            // don't use newBufferedChannel here - may cause the command is 
not sent and the JVM hangs
             // only newChannel flushes the message
             // newBufferedChannel does not flush
             ForkNodeArguments args = getArguments();
@@ -308,11 +274,29 @@ final class SurefireForkChannel extends ForkChannel
         }
 
         @Override
-        protected void job()
+        protected void job() throws IOException, InterruptedException
         {
-            AsynchronousSocketChannel channel = worker.get();
+            AsynchronousSocketChannel channel = getChannel();
+            if ( isNotBlank( sessionId ) )
+            {
+                verifySessionId();
+            }
             eventBindings.bindEventHandler( channel );
             commandBindings.bindCommandSender( channel );
         }
     }
+
+    private AsynchronousSocketChannel getChannel()
+        throws InterruptedException, IOException
+    {
+        try
+        {
+            return session == null ? null : session.get();
+        }
+        catch ( ExecutionException e )
+        {
+            Throwable cause = e.getCause();
+            throw cause instanceof IOException ? (IOException) cause : new 
IOException( cause );
+        }
+    }
 }
diff --git 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
index 60928fb..9f37598 100644
--- 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
+++ 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
@@ -22,12 +22,14 @@ package org.apache.maven.plugin.surefire.extensions;
 import 
org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
-import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
+import org.apache.maven.surefire.api.booter.Command;
 import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.fork.ForkNodeArguments;
 import org.apache.maven.surefire.api.report.ConsoleOutputReceiver;
+import org.apache.maven.surefire.booter.spi.EventChannelEncoder;
 import 
org.apache.maven.surefire.booter.spi.SurefireMasterProcessChannelProcessorFactory;
+import org.apache.maven.surefire.extensions.CommandReader;
 import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.api.fork.ForkNodeArguments;
 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
 import org.junit.Rule;
 import org.junit.Test;
@@ -36,17 +38,20 @@ import org.junit.rules.ExpectedException;
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.fest.assertions.Assertions.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -68,22 +73,70 @@ public class E2ETest
     public void endToEndTest() throws Exception
     {
         ForkNodeArguments arguments = new Arguments( 
UUID.randomUUID().toString(), 1, new NullConsoleLogger() );
+
         final SurefireForkChannel server = new SurefireForkChannel( arguments 
);
+        server.tryConnectToClient();
 
         final String connection = server.getForkNodeConnectionString();
 
         final SurefireMasterProcessChannelProcessorFactory factory = new 
SurefireMasterProcessChannelProcessorFactory();
         factory.connect( connection );
-        final MasterProcessChannelEncoder encoder = factory.createEncoder( 
arguments );
-
-        System.gc();
-
-        TimeUnit.SECONDS.sleep( 3L );
+        final EventChannelEncoder encoder = (EventChannelEncoder) 
factory.createEncoder( arguments );
 
         final CountDownLatch awaitHandlerFinished = new CountDownLatch( 2 );
 
+        final AtomicLong readTime = new AtomicLong();
+
         final int totalCalls = 400_000; // 400_000; // 1_000_000; // 
10_000_000;
 
+        EventHandler<Event> h = new EventHandler<Event>()
+        {
+            private final AtomicInteger counter = new AtomicInteger();
+            private volatile long t1;
+
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                try
+                {
+                    if ( counter.getAndIncrement() == 0 )
+                    {
+                        t1 = System.currentTimeMillis();
+                    }
+
+                    long t2 = System.currentTimeMillis();
+                    long spent = t2 - t1;
+
+                    if ( counter.get() == totalCalls - 64 * 1024 )
+                    {
+                        readTime.set( spent );
+                        System.out.println( spent + "ms on read" );
+                        awaitHandlerFinished.countDown();
+                    }
+                }
+                catch ( Exception e )
+                {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        EventHandler<Event> queue = /*new EventHandler<Event>() {
+            private int i = 0;
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                System.out.println("read " + i++ + " messages");
+            }
+        };*/new ThreadedStreamConsumer( h );
+
+        System.gc();
+
+        SECONDS.sleep( 3L );
+
+        server.bindEventHandler( queue, new CountdownCloseable( new 
DummyCloseable(), 1 ), new DummyReadableChannel() );
+        server.bindCommandReader( new DummyCommandReader(), null );
+
         Thread t = new Thread()
         {
             @Override
@@ -110,6 +163,12 @@ public class E2ETest
                     {
                         //System.out.println( LONG_STRING );
                         encoder.stdOut( LONG_STRING, true );
+                        /*String expected = 
":maven-surefire-event:\u0017:std-out-stream-new-line:"
+                            + (char) 10 + 
":normal-run:\u0005:UTF-8:\u0000\u0000\u0000\u0064:" + LONG_STRING + ":";
+                        encoder.write( ByteBuffer.wrap( expected.getBytes( 
StandardCharsets.US_ASCII ) ), false );
+                        if ( i % 10_000 == 0 ) {
+                            System.out.println("zapisovanie " + i);
+                        }*/
                     }
                     long t2 = System.currentTimeMillis();
                     long spent = t2 - t1;
@@ -127,57 +186,12 @@ public class E2ETest
         t.setDaemon( true );
         t.start();
 
-        server.connectToClient();
-
-        final AtomicLong readTime = new AtomicLong();
-
-        EventHandler<Event> h = new EventHandler<Event>()
-        {
-            private final AtomicInteger counter = new AtomicInteger();
-            private volatile long t1;
-
-            @Override
-            public void handleEvent( @Nonnull Event event )
-            {
-                try
-                {
-                    if ( counter.getAndIncrement() == 0 )
-                    {
-                        t1 = System.currentTimeMillis();
-                    }
-
-                    long t2 = System.currentTimeMillis();
-                    long spent = t2 - t1;
-
-                    if ( counter.get() % 500_000 == 0 )
-                    {
-                        System.out.println( spent + "ms: " + counter.get() );
-                    }
-
-                    if ( counter.get() == totalCalls - 64 * 1024 )
-                    {
-                        readTime.set( spent );
-                        System.out.println( spent + "ms on read" );
-                        awaitHandlerFinished.countDown();
-                    }
-                }
-                catch ( Exception e )
-                {
-                    e.printStackTrace();
-                }
-            }
-        };
-
-        ThreadedStreamConsumer queue = new ThreadedStreamConsumer( h );
-
-        server.bindEventHandler( queue, new CountdownCloseable( new 
DummyCloseable(), 1 ), new DummyReadableChannel() );
-
-        assertThat( awaitHandlerFinished.await( 30L, TimeUnit.SECONDS ) )
+        assertThat( awaitHandlerFinished.await( 30L, SECONDS ) )
             .isTrue();
 
         factory.close();
         server.close();
-        queue.close();
+        //queue.close();
 
         // 1.0 seconds while using the encoder/decoder
         assertThat( readTime.get() )
@@ -210,8 +224,6 @@ public class E2ETest
             t.setDaemon( true );
             t.start();
 
-            server.connectToClient();
-
             assertThat( task.get() )
                 .isEqualTo( "client connected" );
         }
@@ -247,7 +259,11 @@ public class E2ETest
             e.expectMessage( "The actual sessionId 
'6ba7b812-9dad-11d1-80b4-00c04fd430c8' does not match '"
                 + serverSessionId + "'." );
 
-            server.connectToClient();
+            server.tryConnectToClient();
+            server.bindCommandReader( mock( CommandReader.class ), mock( 
WritableByteChannel.class ) );
+
+            server.bindEventHandler( mock( EventHandler.class ),
+                new CountdownCloseable( mock( Closeable.class ), 1 ), mock( 
ReadableByteChannel.class ) );
 
             fail( task.get() );
         }
@@ -255,21 +271,76 @@ public class E2ETest
 
     private static class DummyReadableChannel implements ReadableByteChannel
     {
+        private volatile Thread t;
+
         @Override
-        public int read( ByteBuffer dst )
+        public int read( ByteBuffer dst ) throws IOException
         {
-            return 0;
+            try
+            {
+                t = Thread.currentThread();
+                HOURS.sleep( 1L );
+                return 0;
+            }
+            catch ( InterruptedException e )
+            {
+                throw new IOException( e.getLocalizedMessage(), e );
+            }
         }
 
         @Override
         public boolean isOpen()
         {
-            return false;
+            return true;
+        }
+
+        @Override
+        public void close()
+        {
+            if ( t != null )
+            {
+                t.interrupt();
+            }
+        }
+    }
+
+    private static class DummyCommandReader implements CommandReader
+    {
+        private volatile Thread t;
+
+        @Override
+        public Command readNextCommand() throws IOException
+        {
+            try
+            {
+                t = Thread.currentThread();
+                HOURS.sleep( 1L );
+                return null;
+            }
+            catch ( InterruptedException e )
+            {
+                throw new IOException( e.getLocalizedMessage(), e );
+            }
         }
 
         @Override
         public void close()
         {
+            if ( t != null )
+            {
+                t.interrupt();
+            }
+        }
+
+        @Override
+        public boolean isClosed()
+        {
+            return false;
+        }
+
+        @Override
+        public void tryFlush()
+        {
         }
     }
 
diff --git 
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
 
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
index b34ecc0..9655c63 100644
--- 
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
+++ 
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
@@ -157,7 +157,7 @@ public class ForkChannelTest
             Client client = new Client( uri.getPort(), sessionId );
             client.start();
 
-            channel.connectToClient();
+            channel.tryConnectToClient();
             channel.bindCommandReader( commandReader, null );
             ReadableByteChannel stdOut = mock( ReadableByteChannel.class );
             when( stdOut.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
diff --git 
a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
 
b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
index 678d643..2b83e0f 100644
--- 
a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
+++ 
b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleReadableChannel.java
@@ -63,7 +63,13 @@ abstract class AbstractNoninterruptibleReadableChannel 
implements ReadableByteCh
     @Override
     public final void close() throws IOException
     {
-        open = false;
-        closeImpl();
+        try
+        {
+            closeImpl();
+        }
+        finally
+        {
+            open = false;
+        }
     }
 }
diff --git 
a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
 
b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
index f6b3e6a..6232209 100644
--- 
a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
+++ 
b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
@@ -132,7 +132,7 @@ public class SurefireMasterProcessChannelProcessorFactory
         {
             if ( clientSocketChannel.supportedOptions().contains( option ) )
             {
-                // clientSocketChannel.setOption( option, true );
+                clientSocketChannel.setOption( option, true );
             }
         }
     }
diff --git 
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
 
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
index 26145fb..38d3648 100644
--- 
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
+++ 
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
@@ -34,8 +34,9 @@ import java.nio.channels.WritableByteChannel;
  * and communicates with a dedicated forked JVM. It represents a server.
  * <br>
  * <br>
- * It connects to a remote client by {@link #connectToClient()}, provides a 
connection string
+ * It connects to a remote client by {@link #tryConnectToClient()}, provides a 
connection string
  * {@link #getForkNodeConnectionString()} needed by the client in forked JVM, 
binds event handler and command reader.
+ * This object is called in one Thread.
  *
  * @author <a href="mailto:[email protected]";>Tibor Digana (tibor17)</a>
  * @since 3.0.0-M5
@@ -53,7 +54,13 @@ public abstract class ForkChannel implements Closeable
         this.arguments = arguments;
     }
 
-    public abstract Completable connectToClient() throws IOException, 
InterruptedException;
+    /**
+     * Asynchronously connects to the client.
+     *
+     * @throws IOException if stream fails
+     * @throws InterruptedException if interrupted thread
+     */
+    public abstract void tryConnectToClient() throws IOException, 
InterruptedException;
 
     /**
      * This is server related class, which if binds to a TCP port, determines 
the connection string for the client.
@@ -73,9 +80,10 @@ public abstract class ForkChannel implements Closeable
      * @param commands command reader, see {@link 
CommandReader#readNextCommand()}
      * @param stdIn    optional standard input stream of the JVM to write the 
encoded commands into it
      * @throws IOException if an error in the fork channel
+     * @throws InterruptedException channel interrupted
      */
     public abstract void bindCommandReader( @Nonnull CommandReader commands, 
WritableByteChannel stdIn )
-        throws IOException;
+        throws IOException, InterruptedException;
 
     /**
      * Starts a Thread reading the events.
@@ -84,11 +92,12 @@ public abstract class ForkChannel implements Closeable
      * @param countdownCloseable count down of the final call of {@link 
Closeable#close()}
      * @param stdOut             optional standard output stream of the JVM
      * @throws IOException if an error in the fork channel
+     * @throws InterruptedException channel interrupted
      */
     public abstract void bindEventHandler( @Nonnull EventHandler<Event> 
eventHandler,
                                            @Nonnull CountdownCloseable 
countdownCloseable,
                                            ReadableByteChannel stdOut )
-        throws IOException;
+        throws IOException, InterruptedException;
 
     @Nonnull
     protected ForkNodeArguments getArguments()
@@ -97,4 +106,7 @@ public abstract class ForkChannel implements Closeable
     }
 
     public abstract void disable();
+
+    @Override
+    public abstract void close() throws IOException;
 }
diff --git 
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
 
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
index ec35682..6d0e443 100644
--- 
a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
+++ 
b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
@@ -19,6 +19,7 @@ package org.apache.maven.surefire.extensions.util;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -41,9 +42,9 @@ public abstract class CountDownLauncher
         countDown = new AtomicInteger( count );
     }
 
-    protected abstract void job();
+    protected abstract void job() throws IOException, InterruptedException;
 
-    public void countDown()
+    public void countDown() throws IOException, InterruptedException
     {
         if ( countDown.decrementAndGet() == 0 )
         {

Reply via email to