Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950463
--- Diff:
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
@@ -211,143 +251,203 @@ public void run() {
@Override
public ActiveMQBuffer createTransportBuffer(final int size) {
- return new
ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
+ try {
+ return new
ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
+ } catch (OutOfMemoryError oom) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ final long totalPendingWriteBytes =
batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+ ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> "
+ totalPendingWriteBytes + "[EVENT LOOP] -> " +
pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
+ }
+ throw oom;
+ }
}
@Override
- public Object getID() {
+ public final Object getID() {
// TODO: Think of it
return channel.hashCode();
}
// This is called periodically to flush the batch buffer
@Override
- public void checkFlushBatchBuffer() {
- if (!batchingEnabled) {
- return;
- }
-
- if (writeLock.tryAcquire()) {
- try {
- if (batchBuffer != null && batchBuffer.readable()) {
- channel.writeAndFlush(batchBuffer.byteBuf());
-
- batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
- }
- } finally {
- writeLock.release();
+ public final void checkFlushBatchBuffer() {
+ if (this.batchingEnabled) {
+ //perform the flush only if necessary
+ final int batchBufferSize = batchBufferSize(this.channel,
this.writeBufferHighWaterMark);
+ if (batchBufferSize > 0) {
+ this.channel.flush();
}
}
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public final void write(final ActiveMQBuffer buffer) {
write(buffer, false, false);
}
@Override
- public void write(ActiveMQBuffer buffer, final boolean flush, final
boolean batched) {
+ public final void write(ActiveMQBuffer buffer, final boolean flush,
final boolean batched) {
write(buffer, flush, batched, null);
}
@Override
- public void write(ActiveMQBuffer buffer,
- final boolean flush,
- final boolean batched,
- final ChannelFutureListener futureListener) {
-
- try {
- writeLock.acquire();
-
- try {
- if (batchBuffer == null && batchingEnabled && batched &&
!flush) {
- // Lazily create batch buffer
-
- batchBuffer =
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
-
- if (batchBuffer != null) {
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE ||
!batched || flush) {
- // If the batch buffer is full or it's flush param or
not batched then flush the buffer
-
- buffer = batchBuffer;
- } else {
- return;
- }
-
- if (!batched || flush) {
- batchBuffer = null;
- } else {
- // Create a new buffer
+ public final boolean blockUntilWritable(final int requiredCapacity,
final long timeout, final TimeUnit timeUnit) {
+ final boolean isAllowedToBlock = isAllowedToBlock();
+ if (!isAllowedToBlock) {
+ return canWrite(requiredCapacity);
+ } else {
+ final long timeoutNanos = timeUnit.toNanos(timeout);
+ final long deadline = System.nanoTime() + timeoutNanos;
+ //choose wait time unit size
+ final long parkNanos;
+ //if is requested to wait more than a millisecond than we could
use
+ if (timeoutNanos >= 1_000_000L) {
+ parkNanos = 100_000L;
+ } else {
+ //reduce it doesn't make sense, only a spin loop could be
enough precise with the most OS
+ parkNanos = 1000L;
+ }
+ boolean canWrite;
+ while (!(canWrite = canWrite(requiredCapacity)) &&
System.nanoTime() < deadline) {
+ LockSupport.parkNanos(parkNanos);
+ }
+ return canWrite;
+ }
+ }
- batchBuffer =
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
- }
+ private boolean isAllowedToBlock() {
+ final EventLoop eventLoop = channel.eventLoop();
+ final boolean inEventLoop = eventLoop.inEventLoop();
--- End diff --
My concern is when you are in the evenLoop for another connection
Acceptor receives data, replication sends to backup.. blocks... main
receiver blocks.!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---