[ 
https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957193#comment-15957193
 ] 

ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------

Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109967119
  
    --- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new 
ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    +      try {
    +         return new 
ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = 
batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " 
+ totalPendingWriteBytes + "[EVENT LOOP] -> " + 
pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, 
this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final 
boolean batched) {
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, 
final boolean batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched && 
!flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = 
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || 
!batched || flush) {
    -                  // If the batch buffer is full or it's flush param or 
not batched then flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, 
final long timeout, final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could 
use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be 
enough precise with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && 
System.nanoTime() < deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = 
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    If you try to call it when you are in the event loop it refuses to block to 
avoid any possible issue about stopping Netty I/O threads!
    Being "inEventLoop" is only to mark a caller thread as one that could 
perform channels and I/O operations for Netty.
    In general Netty tends to reuse the same thread(s) to perform operations on 
group of Connections/Sockets (using EPOLL ad NIO at least).
    The call ``inEventLoop`` could return ``true`` only if the current Thread 
is created through the configured thread factory passed to the Netty's 
bootstrap and not in base of what it is actually doing.


> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
>                 Key: ARTEMIS-1025
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty 
> connector, the OutOfDirectMemoryError exception is thrown from the client.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to