[
https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957046#comment-15957046
]
ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------
Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950618
--- Diff:
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
@@ -211,143 +251,203 @@ public void run() {
@Override
public ActiveMQBuffer createTransportBuffer(final int size) {
- return new
ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
+ try {
+ return new
ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
+ } catch (OutOfMemoryError oom) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ final long totalPendingWriteBytes =
batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+ ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> "
+ totalPendingWriteBytes + "[EVENT LOOP] -> " +
pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
+ }
+ throw oom;
+ }
}
@Override
- public Object getID() {
+ public final Object getID() {
// TODO: Think of it
return channel.hashCode();
}
// This is called periodically to flush the batch buffer
@Override
- public void checkFlushBatchBuffer() {
- if (!batchingEnabled) {
- return;
- }
-
- if (writeLock.tryAcquire()) {
- try {
- if (batchBuffer != null && batchBuffer.readable()) {
- channel.writeAndFlush(batchBuffer.byteBuf());
-
- batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
- }
- } finally {
- writeLock.release();
+ public final void checkFlushBatchBuffer() {
+ if (this.batchingEnabled) {
+ //perform the flush only if necessary
+ final int batchBufferSize = batchBufferSize(this.channel,
this.writeBufferHighWaterMark);
+ if (batchBufferSize > 0) {
+ this.channel.flush();
}
}
}
@Override
- public void write(final ActiveMQBuffer buffer) {
+ public final void write(final ActiveMQBuffer buffer) {
write(buffer, false, false);
}
@Override
- public void write(ActiveMQBuffer buffer, final boolean flush, final
boolean batched) {
+ public final void write(ActiveMQBuffer buffer, final boolean flush,
final boolean batched) {
write(buffer, flush, batched, null);
}
@Override
- public void write(ActiveMQBuffer buffer,
- final boolean flush,
- final boolean batched,
- final ChannelFutureListener futureListener) {
-
- try {
- writeLock.acquire();
-
- try {
- if (batchBuffer == null && batchingEnabled && batched &&
!flush) {
- // Lazily create batch buffer
-
- batchBuffer =
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
-
- if (batchBuffer != null) {
- batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
-
- if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE ||
!batched || flush) {
- // If the batch buffer is full or it's flush param or
not batched then flush the buffer
-
- buffer = batchBuffer;
- } else {
- return;
- }
-
- if (!batched || flush) {
- batchBuffer = null;
- } else {
- // Create a new buffer
+ public final boolean blockUntilWritable(final int requiredCapacity,
final long timeout, final TimeUnit timeUnit) {
+ final boolean isAllowedToBlock = isAllowedToBlock();
+ if (!isAllowedToBlock) {
+ return canWrite(requiredCapacity);
+ } else {
+ final long timeoutNanos = timeUnit.toNanos(timeout);
+ final long deadline = System.nanoTime() + timeoutNanos;
+ //choose wait time unit size
+ final long parkNanos;
+ //if is requested to wait more than a millisecond than we could
use
+ if (timeoutNanos >= 1_000_000L) {
+ parkNanos = 100_000L;
+ } else {
+ //reduce it doesn't make sense, only a spin loop could be
enough precise with the most OS
+ parkNanos = 1000L;
+ }
+ boolean canWrite;
+ while (!(canWrite = canWrite(requiredCapacity)) &&
System.nanoTime() < deadline) {
+ LockSupport.parkNanos(parkNanos);
+ }
+ return canWrite;
+ }
+ }
- batchBuffer =
ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
- }
- }
+ private boolean isAllowedToBlock() {
+ final EventLoop eventLoop = channel.eventLoop();
+ final boolean inEventLoop = eventLoop.inEventLoop();
--- End diff --
It's not a blocker for this merge.. it was just a question.. would it
return inevenLoop if it's on the evenLoop for another connection?
> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
> Key: ARTEMIS-1025
> URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: Broker
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty
> connector, the OutOfDirectMemoryError exception is thrown from the client.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)