This is an automated email from the ASF dual-hosted git repository. tibordigana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
commit 6d58bc038c3b71dbde0d3f91038919e989aaa247 Author: tibordigana <[email protected]> AuthorDate: Sun Sep 27 19:50:18 2020 +0200 [SUREFIRE-1827] The console output is not flushed --- surefire-api/pom.xml | 5 + .../AbstractNoninterruptibleWritableChannel.java | 15 +-- .../maven/surefire/api/util/internal/Channels.java | 17 +++- .../util/internal/WritableBufferedByteChannel.java | 2 + .../api/util/internal/ChannelsReaderTest.java | 33 +++++++ .../api/util/internal/ChannelsWriterTest.java | 47 ++++++++- ...stractMasterProcessChannelProcessorFactory.java | 106 +++++++++++++++++++++ ...LegacyMasterProcessChannelProcessorFactory.java | 15 ++- ...refireMasterProcessChannelProcessorFactory.java | 10 +- .../surefire/booter/ForkedBooterMockTest.java | 70 ++++++++++++++ .../spi/LegacyMasterProcessChannelEncoderTest.java | 6 ++ 11 files changed, 298 insertions(+), 28 deletions(-) diff --git a/surefire-api/pom.xml b/surefire-api/pom.xml index ff62005..158c88a 100644 --- a/surefire-api/pom.xml +++ b/surefire-api/pom.xml @@ -52,6 +52,11 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <artifactId>powermock-reflect</artifactId> + <groupId>org.powermock</groupId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java index fe998f3..c2641e5 100644 --- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java +++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java @@ -67,17 +67,12 @@ abstract class AbstractNoninterruptibleWritableChannel implements WritableBuffer src.flip(); } - int countWrittenBytes = 0; - - if ( src.hasRemaining() ) + int countWrittenBytes = src.remaining(); + writeImpl( src ); + src.position( src.limit() ); + if ( flush ) { - countWrittenBytes = src.remaining(); - writeImpl( src ); - src.position( src.limit() ); - if ( flush ) - { - flushImpl(); - } + flushImpl(); } return countWrittenBytes; } diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java index 7bd4efc..3938c4a 100644 --- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java +++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java @@ -32,7 +32,9 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import static java.lang.Math.max; import static java.util.Objects.requireNonNull; /** @@ -227,17 +229,28 @@ public final class Channels }; } - private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out, @Nonnegative int bufferSize ) + private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out, + @Nonnegative final int bufferSize ) { requireNonNull( out, "the stream should not be null" ); final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize ); return new AbstractNoninterruptibleWritableChannel() { + private final AtomicLong bytesCounter = new AtomicLong(); + + @Override + public long countBufferOverflows() + { + return bufferSize == 0 ? 0 : max( bytesCounter.get() - 1, 0 ) / bufferSize; + } + @Override protected void writeImpl( ByteBuffer src ) throws IOException { - bos.write( src.array(), src.arrayOffset() + src.position(), src.remaining() ); + int count = src.remaining(); + bos.write( src.array(), src.arrayOffset() + src.position(), count ); + bytesCounter.getAndAdd( count ); } @Override diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java index 42c0d08..ea86a82 100644 --- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java +++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java @@ -29,8 +29,10 @@ import java.nio.channels.WritableByteChannel; * and the channel is flushed after the buffer has overflew. * <br> * The method {@link #write(ByteBuffer)} flushes every written message. + * You can flush the channel by {@link #write(ByteBuffer) writing} the zero length of {@link ByteBuffer}. */ public interface WritableBufferedByteChannel extends WritableByteChannel { void writeBuffered( ByteBuffer src ) throws IOException; + long countBufferOverflows(); } diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java index 42ca2c7..025cfb5 100644 --- a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java +++ b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java @@ -28,11 +28,13 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousByteChannel; import java.nio.channels.ClosedChannelException; @@ -54,6 +56,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.powermock.reflect.Whitebox.invokeMethod; /** * The tests for {@link Channels#newChannel(InputStream)} and {@link Channels#newBufferedChannel(InputStream)}. @@ -69,6 +72,36 @@ public class ChannelsReaderTest .build(); @Test + public void shouldOverflowBuffer() throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + WritableBufferedByteChannel channel = invokeMethod( Channels.class, "newChannel", + new Class[] {OutputStream.class, int.class}, new Object[] {out, 8} ); + + assertThat( channel.countBufferOverflows() ) + .isEqualTo( 0 ); + + channel.write( ByteBuffer.wrap( new byte[] {1, 2, 3} ) ); + + assertThat( channel.countBufferOverflows() ) + .isEqualTo( 0 ); + + channel.write( ByteBuffer.wrap( new byte[] {4, 5, 6, 7, 8} ) ); + + assertThat( channel.countBufferOverflows() ) + .isEqualTo( 0 ); + + channel.write( ByteBuffer.wrap( new byte[] {9} ) ); + + assertThat( channel.countBufferOverflows() ) + .isEqualTo( 1 ); + + assertThat( out.toByteArray() ) + .isEqualTo( new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9} ); + } + + @Test public void exactBufferSize() throws Exception { ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} ); diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java index 4befc24..24a09f3 100644 --- a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java +++ b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java @@ -137,6 +137,48 @@ public class ChannelsWriterTest } @Test + public void shouldFlushWhenEmptyBuffer() throws Exception + { + final boolean[] flushed = {false}; + ByteArrayOutputStream out = new ByteArrayOutputStream() + { + @Override + public void flush() throws IOException + { + flushed[0] = true; + super.flush(); + } + }; + WritableByteChannel channel = Channels.newChannel( out ); + ByteBuffer bb = ByteBuffer.allocate( 0 ); + int countWritten = channel.write( bb ); + assertThat( countWritten ) + .isEqualTo( 0 ); + assertThat( flushed[0] ) + .isTrue(); + } + + @Test + public void shouldFlushWhenEmptyBufferOnBufferedWrites() throws Exception + { + final boolean[] flushed = {false}; + ByteArrayOutputStream out = new ByteArrayOutputStream() + { + @Override + public void flush() throws IOException + { + flushed[0] = true; + super.flush(); + } + }; + WritableBufferedByteChannel channel = Channels.newBufferedChannel( out ); + ByteBuffer bb = ByteBuffer.allocate( 0 ); + channel.writeBuffered( bb ); + assertThat( flushed[0] ) + .isFalse(); + } + + @Test public void bufferedChannel() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -151,11 +193,6 @@ public class ChannelsWriterTest assertThat( out.toByteArray() ) .isEmpty(); - channel.write( ByteBuffer.allocate( 0 ) ); - - assertThat( out.toByteArray() ) - .isEmpty(); - channel.write( ByteBuffer.wrap( new byte[] {4} ) ); assertThat( out.toByteArray() ) diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java new file mode 100644 index 0000000..63407f2 --- /dev/null +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java @@ -0,0 +1,106 @@ +package org.apache.maven.surefire.booter.spi; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel; +import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.AccessControlException; +import java.security.PrivilegedAction; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import static java.security.AccessController.doPrivileged; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory; + +/** + * Default implementation of {@link MasterProcessChannelProcessorFactory}. + */ +public abstract class AbstractMasterProcessChannelProcessorFactory + implements MasterProcessChannelProcessorFactory +{ + private static final String STREAM_FLUSHER = "surefire-forkedjvm-stream-flusher"; + private final ScheduledExecutorService flusher; + + public AbstractMasterProcessChannelProcessorFactory() + { + flusher = newScheduledThreadPool( 1, newDaemonThreadFactory( STREAM_FLUSHER ) ); + } + + protected void schedulePeriodicFlusher( int delayInMillis, final WritableBufferedByteChannel channel ) + { + final AtomicLong bufferOverflows = new AtomicLong(); + flusher.scheduleWithFixedDelay( new Runnable() + { + @Override + public void run() + { + long currentBufferOverflows = channel.countBufferOverflows(); + // optimization: flush the Channel only if the buffer has not overflew after last period of time + if ( bufferOverflows.get() == currentBufferOverflows ) + { + try + { + channel.write( ByteBuffer.allocate( 0 ) ); + } + catch ( Exception e ) + { + // cannot do anything about this I/O issue + } + } + else + { + bufferOverflows.set( currentBufferOverflows ); + } + } + }, 0L, delayInMillis, MILLISECONDS ); + } + + @Override + public void close() throws IOException + { + try + { + doPrivileged( new PrivilegedAction<Object>() + { + @Override + public Object run() + { + flusher.shutdown(); + // Do NOT call awaitTermination() due to this would unnecessarily prolong teardown + // time of the JVM. It is not a critical situation when the JXM exits this daemon + // thread because the interrupted flusher does not break any business function. + // All business data is already safely flushed by test events, then by sending BYE + // event at the exit time and finally by flushEventChannelOnExit() in ForkedBooter. + return null; + } + } + ); + } + catch ( AccessControlException e ) + { + // ignore + } + } +} diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java index 1344f3d..6e28764 100644 --- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java @@ -21,7 +21,7 @@ package org.apache.maven.surefire.booter.spi; import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder; import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder; -import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory; +import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel; import java.io.IOException; import java.net.MalformedURLException; @@ -36,8 +36,10 @@ import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedCh * @since 3.0.0-M5 */ public class LegacyMasterProcessChannelProcessorFactory - implements MasterProcessChannelProcessorFactory + extends AbstractMasterProcessChannelProcessorFactory { + private static final int FLUSH_PERIOD_MILLIS = 100; + @Override public boolean canUse( String channelConfig ) { @@ -62,11 +64,8 @@ public class LegacyMasterProcessChannelProcessorFactory @Override public MasterProcessChannelEncoder createEncoder() { - return new LegacyMasterProcessChannelEncoder( newBufferedChannel( System.out ) ); - } - - @Override - public void close() - { + WritableBufferedByteChannel channel = newBufferedChannel( System.out ); + schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel ); + return new LegacyMasterProcessChannelEncoder( channel ); } } diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java index 9efff25..0bebeb4 100644 --- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java +++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java @@ -21,7 +21,7 @@ package org.apache.maven.surefire.booter.spi; import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder; import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder; -import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory; +import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel; import java.io.IOException; import java.net.InetSocketAddress; @@ -53,8 +53,9 @@ import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.ne * @since 3.0.0-M5 */ public class SurefireMasterProcessChannelProcessorFactory - implements MasterProcessChannelProcessorFactory + extends AbstractMasterProcessChannelProcessorFactory { + private static final int FLUSH_PERIOD_MILLIS = 100; private volatile AsynchronousSocketChannel clientSocketChannel; @Override @@ -104,12 +105,15 @@ public class SurefireMasterProcessChannelProcessorFactory @Override public MasterProcessChannelEncoder createEncoder() { - return new LegacyMasterProcessChannelEncoder( newBufferedChannel( newOutputStream( clientSocketChannel ) ) ); + WritableBufferedByteChannel channel = newBufferedChannel( newOutputStream( clientSocketChannel ) ); + schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel ); + return new LegacyMasterProcessChannelEncoder( channel ); } @Override public void close() throws IOException { + super.close(); if ( clientSocketChannel != null && clientSocketChannel.isOpen() ) { clientSocketChannel.close(); diff --git a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java index 1982f7e..c1140c8 100644 --- a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java +++ b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java @@ -22,6 +22,8 @@ package org.apache.maven.surefire.booter; import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder; import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder; import org.apache.maven.surefire.api.report.StackTraceWriter; +import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel; +import org.apache.maven.surefire.booter.spi.AbstractMasterProcessChannelProcessorFactory; import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelDecoder; import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelEncoder; import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelProcessorFactory; @@ -42,6 +44,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.MalformedURLException; @@ -55,6 +58,7 @@ import java.util.concurrent.FutureTask; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel; import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Fail.fail; import static org.mockito.ArgumentMatchers.any; @@ -296,6 +300,72 @@ public class ForkedBooterMockTest } @Test + @SuppressWarnings( "checkstyle:magicnumber" ) + public void shouldScheduleFlushes() throws Exception + { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + class Factory extends AbstractMasterProcessChannelProcessorFactory + { + @Override + public boolean canUse( String channelConfig ) + { + return false; + } + + @Override + public void connect( String channelConfig ) + { + } + + @Override + public MasterProcessChannelDecoder createDecoder() + { + return null; + } + + @Override + public MasterProcessChannelEncoder createEncoder() + { + return null; + } + + public void runScheduler() throws InterruptedException + { + final WritableBufferedByteChannel channel = newBufferedChannel( out ); + schedulePeriodicFlusher( 100, channel ); + Thread t = new Thread() + { + @Override + public void run() + { + for ( int i = 0; i < 10; i++ ) + { + try + { + channel.write( ByteBuffer.wrap( new byte[] {1} ) ); + Thread.sleep( 75 ); + } + catch ( Exception e ) + { + // + } + } + } + }; + t.setDaemon( true ); + t.start(); + t.join( 5000L ); + } + } + + Factory factory = new Factory(); + factory.runScheduler(); + factory.close(); + assertThat( out.size() ).isPositive(); + assertThat( out.size() ).isLessThanOrEqualTo( 10 ); + } + + @Test public void shouldLookupSurefireDecoderFactory() throws Exception { mockStatic( ForkedBooter.class ); diff --git a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java index 7f72ba2..0262f3e 100644 --- a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java +++ b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java @@ -1418,6 +1418,12 @@ public class LegacyMasterProcessChannelEncoderTest } @Override + public long countBufferOverflows() + { + return 0; + } + + @Override public int write( ByteBuffer src ) throws IOException { this.src = src;
