This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit fea1f6b16892e61fd48a60ff4eca67e235884dda Author: Murtadha Hubail <[email protected]> AuthorDate: Tue Oct 8 17:41:43 2019 +0300 [NO ISSUE][NET] Log ChannelReadInterface Stats on Failure - user model changes: no - storage format changes: no - interface changes: yes Details: - When the read buffers of a FullFrameChannelReadInterface are exceeded, log its stats to help in debugging the issue. - Warn when a partial frame is being written over a full frame write channel. - Rename ThreadSafetyGuaranteedBy -> GuardedBy. - Annotate networking calls with their synchronization guards. Change-Id: I89eed0c06dbf4b0e86747538bff286dc37853957 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3663 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Al Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../MessagingChannelInterfaceFactory.java | 5 +++++ .../apache/hyracks/api/comm/IBufferFactory.java | 15 +++++++++++-- .../hyracks/comm/channels/ReadBufferFactory.java | 7 ++++++ .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 4 ++-- .../muxdemux/AbstractChannelReadInterface.java | 4 ++++ .../muxdemux/AbstractChannelWriteInterface.java | 6 ++++++ .../protocols/muxdemux/ChannelControlBlock.java | 3 +++ .../muxdemux/FullFrameChannelReadInterface.java | 25 +++++++++++++++++----- .../muxdemux/FullFrameChannelWriteInterface.java | 5 +++++ .../protocols/muxdemux/MultiplexedConnection.java | 5 +++-- .../tests/FullFrameChannelReadInterfaceTest.java | 6 ++++++ ...hreadSafetyGuaranteedBy.java => GuardedBy.java} | 4 ++-- 12 files changed, 76 insertions(+), 13 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java index abd401a..17e2744 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java @@ -93,6 +93,11 @@ public class MessagingChannelInterfaceFactory implements IChannelInterfaceFactor } return buffer; } + + @Override + public int getCreatedBuffersCount() { + return 0; + } } /** diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java index 9e8d931..d1d5b9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java @@ -22,9 +22,20 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.exceptions.HyracksDataException; -@FunctionalInterface public interface IBufferFactory { - public ByteBuffer createBuffer() throws HyracksDataException; + /** + * Creates a buffer + * + * @return the created buffer + * @throws HyracksDataException + */ + ByteBuffer createBuffer() throws HyracksDataException; + /** + * Gets the number of created buffers + * + * @return the number of created buffers + */ + int getCreatedBuffersCount(); } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java index 876b3de..ed639f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java @@ -22,7 +22,9 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.context.IHyracksCommonContext; +import org.apache.hyracks.util.annotations.NotThreadSafe; +@NotThreadSafe public class ReadBufferFactory implements IBufferFactory { private final int limit; @@ -44,4 +46,9 @@ public class ReadBufferFactory implements IBufferFactory { return frame; } } + + @Override + public int getCreatedBuffersCount() { + return counter; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index 2a6bdae..260c6b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -35,8 +35,8 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; +import org.apache.hyracks.util.annotations.GuardedBy; import org.apache.hyracks.util.annotations.NotThreadSafe; -import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,7 +45,7 @@ import org.apache.logging.log4j.Logger; * An implementation of IJobQueue that gives more priority to jobs that are submitted earlier. */ @NotThreadSafe -@ThreadSafetyGuaranteedBy("JobManager") +@GuardedBy("JobManager") public class FIFOJobQueue implements IJobQueue { private static final Logger LOGGER = LogManager.getLogger(); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java index 31cb69f..d81517d 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.IBufferAcceptor; import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelReadInterface; import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.util.annotations.GuardedBy; public abstract class AbstractChannelReadInterface implements IChannelReadInterface { @@ -31,6 +32,7 @@ public abstract class AbstractChannelReadInterface implements IChannelReadInterf protected IBufferAcceptor emptyBufferAcceptor; protected ByteBuffer currentReadBuffer; protected IBufferFactory bufferFactory; + @GuardedBy("MultiplexConnection") protected volatile int credits; @Override @@ -58,11 +60,13 @@ public abstract class AbstractChannelReadInterface implements IChannelReadInterf } @Override + @GuardedBy("MultiplexConnection") public int getCredits() { return credits; } @Override + @GuardedBy("MultiplexConnection") public void setReadCredits(int credits) { this.credits = credits; } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java index 5c927f9..741ca8c 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java @@ -27,6 +27,7 @@ import org.apache.hyracks.api.comm.IBufferAcceptor; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.comm.IChannelWriteInterface; import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.util.annotations.GuardedBy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,6 +43,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte protected boolean channelWritabilityState; protected final int channelId; protected IBufferAcceptor eba; + @GuardedBy("ChannelControlBlock") protected int credits; protected boolean eos; protected boolean eosSent; @@ -61,6 +63,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte } @Override + @GuardedBy("ChannelControlBlock") public void writeComplete() { if (currentWriteBuffer.remaining() <= 0) { currentWriteBuffer.clear(); @@ -70,6 +73,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte } } + @GuardedBy("ChannelControlBlock") private boolean computeWritability() { boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty(); if (writableDataPresent) { @@ -82,6 +86,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte } @Override + @GuardedBy("ChannelControlBlock") public void adjustChannelWritability() { boolean writable = computeWritability(); if (writable) { @@ -97,6 +102,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte } @Override + @GuardedBy("ChannelControlBlock") public void addCredits(int credit) { credits += credit; } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java index 3f88630..4ca7f1a 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState; import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.annotations.GuardedBy; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -95,10 +96,12 @@ public class ChannelControlBlock implements IChannelControlBlock { return ri.read(sc, size); } + @GuardedBy("MultiplexConnection") int getReadCredits() { return ri.getCredits(); } + @GuardedBy("MultiplexConnection") void setReadCredits(int credits) { ri.setReadCredits(credits); } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java index f32e6bf..9d7f848 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java @@ -27,6 +27,7 @@ import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.util.annotations.GuardedBy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +37,9 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface private final Deque<ByteBuffer> riEmptyStack; private final IChannelControlBlock ccb; private final Object bufferRecycleLock = new Object(); + private int frameSize; + private long recycledBuffers = 0; + private long flushedBuffers = 0; public FullFrameChannelReadInterface(IChannelControlBlock ccb) { this.ccb = ccb; @@ -43,17 +47,22 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface credits = 0; emptyBufferAcceptor = buffer -> { final int delta = buffer.remaining(); + if (delta != frameSize) { + LOGGER.warn("partial frame being recycled; expected size {}, actual size {}", frameSize, delta); + } synchronized (bufferRecycleLock) { if (ccb.isRemotelyClosed()) { return; } riEmptyStack.push(buffer); + recycledBuffers++; ccb.addPendingCredits(delta); } }; } @Override + @GuardedBy("ChannelControlBlock") public int read(ISocketChannel sc, int size) throws IOException, NetException { synchronized (bufferRecycleLock) { while (true) { @@ -62,16 +71,12 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface } if (currentReadBuffer == null) { currentReadBuffer = riEmptyStack.poll(); - //if current buffer == null and limit not reached - // factory.createBuffer factory if (currentReadBuffer == null) { currentReadBuffer = bufferFactory.createBuffer(); } } if (currentReadBuffer == null) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size()); - } + logStats(); throw new IllegalStateException(ccb + " read buffers exceeded"); } int rSize = Math.min(size, currentReadBuffer.remaining()); @@ -95,6 +100,7 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface } if (currentReadBuffer.remaining() <= 0) { flush(); + flushedBuffers++; } } } @@ -102,7 +108,16 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface @Override public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) { + this.frameSize = frameSize; super.setBufferFactory(bufferFactory, limit, frameSize); ccb.addPendingCredits(limit * frameSize); } + + private void logStats() { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn( + "{} read buffers exceeded; current empty buffers: {}, created buffers: {}, recycled buffers: {}, flushed buffers: {}", + ccb, riEmptyStack.size(), bufferFactory.getCreatedBuffersCount(), recycledBuffers, flushedBuffers); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java index 3f4618b..a7be3a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.comm.IConnectionWriterState; import org.apache.hyracks.api.comm.MuxDemuxCommand; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.util.annotations.GuardedBy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,6 +36,7 @@ public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterfac } @Override + @GuardedBy("ChannelControlBlock") public void write(IConnectionWriterState writerState) throws NetException { if (currentWriteBuffer == null) { currentWriteBuffer = wiFullQueue.poll(); @@ -43,6 +45,9 @@ public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterfac int size = Math.min(currentWriteBuffer.remaining(), credits); if (size > 0) { credits -= size; + if (credits % currentWriteBuffer.capacity() != 0) { + LOGGER.warn("partial frame being written on {}", ccb); + } writerState.getCommand().setChannelId(channelId); writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.DATA); writerState.getCommand().setData(size); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java index 64c1a53..061c6eee 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java @@ -34,7 +34,7 @@ import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener; import org.apache.hyracks.net.protocols.tcp.TCPConnection; import org.apache.hyracks.util.JSONUtil; -import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy; +import org.apache.hyracks.util.annotations.GuardedBy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -406,6 +406,7 @@ public class MultiplexedConnection implements ITCPConnectionEventListener { int channelId = readerState.command.getChannelId(); ccb = cSet.registerChannel(channelId); muxDemux.getChannelOpenListener().channelOpened(ccb); + break; } } if (LOGGER.isTraceEnabled()) { @@ -429,7 +430,7 @@ public class MultiplexedConnection implements ITCPConnectionEventListener { return muxDemux.getChannelInterfaceFactory(); } - @ThreadSafetyGuaranteedBy("MultiplexedConnection.this") + @GuardedBy("MultiplexedConnection.this") private class EventCounter implements IEventCounter { private int counter; diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java index ef6048d..2180d1d 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java @@ -176,5 +176,11 @@ public class FullFrameChannelReadInterfaceTest { counter++; return ByteBuffer.allocate(frameSize); } + + @Override + public int getCreatedBuffersCount() { + return counter; + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java similarity index 92% rename from hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java rename to hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java index 7ca1a94..0ce2727 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafetyGuaranteedBy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/GuardedBy.java @@ -29,8 +29,8 @@ import java.lang.annotation.Target; * to be thread safe by {@link #value()} */ @Documented -@Target({ ElementType.TYPE, ElementType.METHOD }) +@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD }) @Retention(RetentionPolicy.SOURCE) -public @interface ThreadSafetyGuaranteedBy { +public @interface GuardedBy { String value(); }
