Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109967119
  
    --- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 ---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new 
ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    +      try {
    +         return new 
ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = 
batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " 
+ totalPendingWriteBytes + "[EVENT LOOP] -> " + 
pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, 
this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final 
boolean batched) {
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, 
final boolean batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched && 
!flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = 
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || 
!batched || flush) {
    -                  // If the batch buffer is full or it's flush param or 
not batched then flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, 
final long timeout, final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could 
use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be 
enough precise with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && 
System.nanoTime() < deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = 
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    If you try to call it when you are in the event loop it refuses to block to 
avoid any possible issue about stopping Netty I/O threads!
    Being "inEventLoop" is only to mark a caller thread as one that could 
perform channels and I/O operations for Netty.
    In general Netty tends to reuse the same thread(s) to perform operations on 
group of Connections/Sockets (using EPOLL ad NIO at least).
    The call ``inEventLoop`` could return ``true`` only if the current Thread 
is created through the configured thread factory passed to the Netty's 
bootstrap and not in base of what it is actually doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to