This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 70c2200 ARTEMIS-2496 Revert catch up with zero-copy, as it's causing
issues into some integration usage
new d9e7025 This closes #2843
70c2200 is described below
commit 70c2200c54066eb6d9f1d0235aae5c3bbf5b7412
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 18 11:55:23 2019 -0400
ARTEMIS-2496 Revert catch up with zero-copy, as it's causing issues into
some integration usage
Revert "ARTEMIS-2336 Use zero copy to replicate journal/page/large message
file"
This reverts commit 85b93f0883bc06a2dfe2de9d560805a59d626d38.
---
.../artemis/core/protocol/core/Channel.java | 20 ---
.../core/protocol/core/impl/ChannelImpl.java | 118 +++++---------
.../core/protocol/core/impl/PacketImpl.java | 11 +-
.../core/remoting/impl/netty/NettyConnection.java | 41 -----
.../impl/netty/NonClosingDefaultFileRegion.java | 38 -----
.../artemis/spi/core/remoting/Connection.java | 4 -
.../core/protocol/core/impl/ChannelImplTest.java | 11 --
.../wireformat/ReplicationSyncFileMessage.java | 170 +++++++++------------
.../core/remoting/impl/invm/InVMConnection.java | 24 ---
.../core/replication/ReplicationManager.java | 105 +++++--------
.../wireformat/ReplicationSyncFileMessageTest.java | 85 -----------
.../integration/cluster/util/BackupSyncDelay.java | 7 -
.../remoting/impl/netty/NettyConnectionTest.java | 28 ----
13 files changed, 149 insertions(+), 513 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index e541dad..56f8259 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -69,20 +67,6 @@ public interface Channel {
boolean send(Packet packet);
/**
- * Sends a packet and file on this channel.
- *
- * @param packet the packet to send
- * @param raf the file to send
- * @param fileChannel the file channel retrieved from raf
- * @param offset the position of the raf
- * @param dataSize the data size to send
- * @param callback callback after send
- * @return false if the packet was rejected by an outgoing interceptor;
true if the send was
- * successful
- */
- boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel,
long offset, int dataSize, Callback callback);
-
- /**
* Sends a packet on this channel.
*
* @param packet the packet to send
@@ -263,8 +247,4 @@ public interface Channel {
* @param transferring whether the channel is transferring
*/
void setTransferring(boolean transferring);
-
- interface Callback {
- void done(boolean success);
- }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index d69b1e1..154ab8a 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -27,7 +25,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -277,104 +274,67 @@ public final class ChannelImpl implements Channel {
}
}
- private ActiveMQBuffer beforeSend(final Packet packet, final int
reconnectID) {
- packet.setChannelID(id);
-
- if (responseAsyncCache != null && packet.isRequiresResponse() &&
packet.isResponseAsync()) {
- packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
- }
-
- if (logger.isTraceEnabled()) {
- logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" :
connection.getID()) + " Sending packet nonblocking " + packet + " on
channelID=" + id);
+ // This must never called by more than one thread concurrently
+ private boolean send(final Packet packet, final int reconnectID, final
boolean flush, final boolean batch) {
+ if (invokeInterceptors(packet, interceptors, connection) != null) {
+ return false;
}
- ActiveMQBuffer buffer = packet.encode(connection);
-
- lock.lock();
+ synchronized (sendLock) {
+ packet.setChannelID(id);
- try {
- if (failingOver) {
- waitForFailOver("RemotingConnectionID=" + (connection == null ?
"NULL" : connection.getID()) + " timed-out waiting for fail-over condition on
non-blocking send");
+ if (responseAsyncCache != null && packet.isRequiresResponse() &&
packet.isResponseAsync()) {
+ packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
- // Sanity check
- if (transferring) {
- throw
ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ?
"NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on
channelID=" + id);
}
- if (resendCache != null && packet.isRequiresConfirmations()) {
- addResendPacket(packet);
- }
+ ActiveMQBuffer buffer = packet.encode(connection);
- } finally {
- lock.unlock();
- }
+ lock.lock();
- if (logger.isTraceEnabled()) {
- logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" :
connection.getID()) + " Writing buffer for channelID=" + id);
- }
+ try {
+ if (failingOver) {
+ waitForFailOver("RemotingConnectionID=" + (connection == null ?
"NULL" : connection.getID()) + " timed-out waiting for fail-over condition on
non-blocking send");
+ }
- checkReconnectID(reconnectID);
+ // Sanity check
+ if (transferring) {
+ throw
ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
+ }
- //We do this outside the lock as ResponseCache is threadsafe and allows
responses to come in,
- //As the send could block if the response cache cannot add, preventing
responses to be handled.
- if (responseAsyncCache != null && packet.isRequiresResponse() &&
packet.isResponseAsync()) {
- while (!responseAsyncCache.add(packet)) {
- try {
- Thread.sleep(1);
- } catch (Exception e) {
- // Ignore
+ if (resendCache != null && packet.isRequiresConfirmations()) {
+ addResendPacket(packet);
}
- }
- }
- return buffer;
- }
+ } finally {
+ lock.unlock();
+ }
- // This must never called by more than one thread concurrently
- private boolean send(final Packet packet, final int reconnectID, final
boolean flush, final boolean batch) {
- if (invokeInterceptors(packet, interceptors, connection) != null) {
- return false;
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ?
"NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
+ }
- synchronized (sendLock) {
- ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
+ checkReconnectID(reconnectID);
- // The actual send must be outside the lock, or with OIO transport,
the write can block if the tcp
- // buffer is full, preventing any incoming buffers being handled and
blocking failover
- try {
- connection.getTransportConnection().write(buffer, flush, batch);
- } catch (Throwable t) {
- //If runtime exception, we must remove from the cache to avoid
filling up the cache causing it to be full.
- //The client would get still know about this as the exception
bubbles up the call stack instead.
- if (responseAsyncCache != null && packet.isRequiresResponse() &&
packet.isResponseAsync()) {
- responseAsyncCache.remove(packet.getCorrelationID());
+ //We do this outside the lock as ResponseCache is threadsafe and
allows responses to come in,
+ //As the send could block if the response cache cannot add,
preventing responses to be handled.
+ if (responseAsyncCache != null && packet.isRequiresResponse() &&
packet.isResponseAsync()) {
+ while (!responseAsyncCache.add(packet)) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ // Ignore
+ }
}
- throw t;
}
- return true;
- }
- }
-
- @Override
- public boolean send(Packet packet,
- RandomAccessFile raf,
- FileChannel fileChannel,
- long offset,
- int dataSize,
- Callback callback) {
- if (invokeInterceptors(packet, interceptors, connection) != null) {
- return false;
- }
-
- synchronized (sendLock) {
- ActiveMQBuffer buffer = beforeSend(packet, -1);
// The actual send must be outside the lock, or with OIO transport,
the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and
blocking failover
try {
- connection.getTransportConnection().write(buffer);
- connection.getTransportConnection().write(raf, fileChannel,
offset, dataSize, callback == null ? null : (ChannelFutureListener) future ->
callback.done(future == null || future.isSuccess()));
+ connection.getTransportConnection().write(buffer, flush, batch);
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid
filling up the cache causing it to be full.
//The client would get still know about this as the exception
bubbles up the call stack instead.
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index a7a3253..f8f85e8 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -336,11 +336,7 @@ public class PacketImpl implements Packet {
}
protected void encodeSize(ActiveMQBuffer buffer) {
- encodeSize(buffer, buffer.writerIndex());
- }
-
- protected void encodeSize(ActiveMQBuffer buffer, int size) {
- this.size = size;
+ size = buffer.writerIndex();
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
@@ -349,10 +345,9 @@ public class PacketImpl implements Packet {
}
protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
- return createPacket(connection, expectedEncodeSize());
- }
- protected ActiveMQBuffer createPacket(CoreRemotingConnection connection,
int size) {
+ int size = expectedEncodeSize();
+
if (connection == null) {
return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 497448e..51330c7 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
-import java.io.IOException;
-import java.io.RandomAccessFile;
import java.net.SocketAddress;
-import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -32,8 +29,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedFile;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -355,18 +350,6 @@ public class NettyConnection implements Connection {
return canWrite;
}
- private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel,
long offset, int dataSize) {
- if (channel.pipeline().get(SslHandler.class) == null) {
- return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
- } else {
- try {
- return new ChunkedFile(raf, offset, dataSize, 8192);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
@@ -407,30 +390,6 @@ public class NettyConnection implements Connection {
}
}
- @Override
- public void write(RandomAccessFile raf,
- FileChannel fileChannel,
- long offset,
- int dataSize,
- final ChannelFutureListener futureListener) {
- final int readableBytes = dataSize;
- if (logger.isDebugEnabled()) {
- final int remainingBytes = this.writeBufferHighWaterMark -
readableBytes;
- if (remainingBytes < 0) {
- logger.debug("a write request is exceeding by " +
(-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " +
this.writeBufferHighWaterMark + " ] : consider to set it at least of " +
readableBytes + " bytes");
- }
- }
-
- //no need to lock because the Netty's channel is thread-safe
- //and the order of write is ensured by the order of the write calls
- final Channel channel = this.channel;
- assert readableBytes >= 0;
- ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf,
fileChannel, offset, dataSize));
- if (futureListener != null) {
- channelFuture.addListener(futureListener);
- }
- }
-
private static void flushAndWait(final Channel channel, final
ChannelPromise promise) {
if (!channel.eventLoop().inEventLoop()) {
waitFor(promise, DEFAULT_WAIT_MILLIS);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
deleted file mode 100644
index 4fc367f..0000000
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.remoting.impl.netty;
-
-import java.io.File;
-import java.nio.channels.FileChannel;
-
-import io.netty.channel.DefaultFileRegion;
-
-public class NonClosingDefaultFileRegion extends DefaultFileRegion {
-
- public NonClosingDefaultFileRegion(FileChannel file, long position, long
count) {
- super(file, position, count);
- }
-
- public NonClosingDefaultFileRegion(File f, long position, long count) {
- super(f, position, count);
- }
-
- @Override
- protected void deallocate() {
- // Overridden to avoid closing the file
- }
-}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index fe5d395..ebde456 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.spi.core.remoting;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelFutureListener;
@@ -103,8 +101,6 @@ public interface Connection {
*/
void write(ActiveMQBuffer buffer);
- void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int
dataSize, ChannelFutureListener futureListener);
-
/**
* This should close the internal channel without calling any listeners.
* This is to avoid a situation where the broker is busy writing on an
internal thread.
diff --git
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 4a4ca39..7d3fb23 100644
---
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.security.auth.Subject;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@@ -395,15 +393,6 @@ public class ChannelImplTest {
}
@Override
- public void write(RandomAccessFile raf,
- FileChannel fileChannel,
- long offset,
- int dataSize,
- ChannelFutureListener channelFutureListener) {
-
- }
-
- @Override
public void forceClose() {
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 5a30c64..b81782b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,30 +16,22 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.EnumSet;
-import java.util.Objects;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
-import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.utils.DataConstants;
-import org.jboss.logging.Logger;
/**
* Message is used to sync {@link
org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The
{@link FileType} controls
* which extra information is sent.
*/
public final class ReplicationSyncFileMessage extends PacketImpl {
- private static final Logger logger =
Logger.getLogger(ReplicationSyncFileMessage.class);
/**
* The JournalType or {@code null} if sync'ing large-messages.
@@ -51,12 +43,10 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
*/
private long fileId;
private int dataSize;
+ private ByteBuf byteBuffer;
private byte[] byteArray;
private SimpleString pageStoreName;
private FileType fileType;
- private RandomAccessFile raf;
- private FileChannel fileChannel;
- private long offset;
public enum FileType {
JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
@@ -88,18 +78,14 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
public
ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
SimpleString storeName,
long id,
- RandomAccessFile raf,
- FileChannel fileChannel,
- long offset,
- int size) {
+ int size,
+ ByteBuf buffer) {
this();
+ this.byteBuffer = buffer;
this.pageStoreName = storeName;
this.dataSize = size;
this.fileId = id;
- this.raf = raf;
- this.fileChannel = fileChannel;
this.journalType = content;
- this.offset = offset;
determineType();
}
@@ -113,30 +99,10 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
}
}
- public long getFileId() {
- return fileId;
- }
-
- public int getDataSize() {
- return dataSize;
- }
-
- public RandomAccessFile getRaf() {
- return raf;
- }
-
- public FileChannel getFileChannel() {
- return fileChannel;
- }
-
- public long getOffset() {
- return offset;
- }
-
@Override
public int expectedEncodeSize() {
int size = PACKET_HEADERS_SIZE +
- DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+ DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
if (fileId == -1)
return size;
@@ -159,7 +125,7 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
if (dataSize > 0) {
- size += dataSize;
+ size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0,
byteBuffer.writerIndex());
}
return size;
@@ -184,55 +150,30 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
default:
// no-op
}
- buffer.writeInt(dataSize);
- }
- @Override
- public ActiveMQBuffer encode(CoreRemotingConnection connection) {
- if (fileId != -1 && dataSize > 0) {
- ActiveMQBuffer buffer;
- int bufferSize = expectedEncodeSize();
- int encodedSize = bufferSize;
- boolean isNetty = false;
- if (connection != null && connection.getTransportConnection()
instanceof NettyConnection) {
- bufferSize -= dataSize;
- isNetty = true;
- }
- buffer = createPacket(connection, bufferSize);
- encodeHeader(buffer);
- encodeRest(buffer, connection);
- if (!isNetty) {
- ByteBuffer byteBuffer;
- if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount()
== 1) {
- byteBuffer =
buffer.byteBuf().internalNioBuffer(buffer.writerIndex(),
buffer.writableBytes());
- } else {
- byteBuffer = buffer.toByteBuffer(buffer.writerIndex(),
buffer.writableBytes());
- }
- readFile(byteBuffer);
- buffer.writerIndex(buffer.capacity());
- }
- encodeSize(buffer, encodedSize);
- return buffer;
- } else {
- return super.encode(connection);
+ buffer.writeInt(dataSize);
+ /*
+ * sending -1 will close the file in case of a journal, but not in case
of a largeMessage
+ * (which might receive appends)
+ */
+ if (dataSize > 0) {
+ buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
+
+ release();
}
@Override
public void release() {
- if (raf != null) {
- try {
- raf.close();
- } catch (IOException e) {
- logger.error("Close file " + this + " failed", e);
- }
+ if (byteBuffer != null) {
+ byteBuffer.release();
+ byteBuffer = null;
}
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
fileId = buffer.readLong();
- if (fileId == -1) return;
switch (FileType.getFileType(buffer.readByte())) {
case JOURNAL: {
journalType =
AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
@@ -256,14 +197,6 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
}
}
- private void readFile(ByteBuffer buffer) {
- try {
- fileChannel.read(buffer, offset);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
public long getId() {
return fileId;
}
@@ -285,22 +218,61 @@ public final class ReplicationSyncFileMessage extends
PacketImpl {
}
@Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- if (!super.equals(o))
- return false;
- ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o;
- return fileId == that.fileId && dataSize == that.dataSize && offset ==
that.offset && journalType == that.journalType && Arrays.equals(byteArray,
that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) &&
fileType == that.fileType && Objects.equals(raf, that.raf) &&
Objects.equals(fileChannel, that.fileChannel);
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + Arrays.hashCode(byteArray);
+ result = prime * result + ((byteBuffer == null) ? 0 :
byteBuffer.hashCode());
+ result = prime * result + dataSize;
+ result = prime * result + (int) (fileId ^ (fileId >>> 32));
+ result = prime * result + ((fileType == null) ? 0 : fileType.hashCode());
+ result = prime * result + ((journalType == null) ? 0 :
journalType.hashCode());
+ result = prime * result + ((pageStoreName == null) ? 0 :
pageStoreName.hashCode());
+ return result;
}
@Override
- public int hashCode() {
- int result = Objects.hash(super.hashCode(), journalType, fileId,
dataSize, pageStoreName, fileType, raf, fileChannel, offset);
- result = 31 * result + Arrays.hashCode(byteArray);
- return result;
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!super.equals(obj)) {
+ return false;
+ }
+ if (!(obj instanceof ReplicationSyncFileMessage)) {
+ return false;
+ }
+ ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj;
+ if (!Arrays.equals(byteArray, other.byteArray)) {
+ return false;
+ }
+ if (byteBuffer == null) {
+ if (other.byteBuffer != null) {
+ return false;
+ }
+ } else if (!byteBuffer.equals(other.byteBuffer)) {
+ return false;
+ }
+ if (dataSize != other.dataSize) {
+ return false;
+ }
+ if (fileId != other.fileId) {
+ return false;
+ }
+ if (fileType != other.fileType) {
+ return false;
+ }
+ if (journalType != other.journalType) {
+ return false;
+ }
+ if (pageStoreName == null) {
+ if (other.pageStoreName != null) {
+ return false;
+ }
+ } else if (!pageStoreName.equals(other.pageStoreName)) {
+ return false;
+ }
+ return true;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 02f1c84..b2fc576 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.invm;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -245,28 +243,6 @@ public class InVMConnection implements Connection {
}
@Override
- public void write(RandomAccessFile raf,
- FileChannel fileChannel,
- long offset,
- int dataSize,
- final ChannelFutureListener futureListener) {
- if (futureListener == null) {
- return;
- }
- try {
- executor.execute(() -> {
- try {
- futureListener.operationComplete(null);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- });
- } catch (RejectedExecutionException e) {
-
- }
- }
-
- @Override
public String getRemoteAddress() {
return "invm:" + serverID;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d48a5a0..1d1217d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -16,7 +16,8 @@
*/
package org.apache.activemq.artemis.core.replication;
-import java.io.RandomAccessFile;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.LinkedHashSet;
@@ -27,6 +28,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -389,39 +392,6 @@ public final class ReplicationManager implements
ActiveMQComponent {
return repliToken;
}
- private OperationContext sendSyncFileMessage(final
ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
- if (!enabled) {
- syncFileMessage.release();
- return null;
- }
-
- final OperationContext repliToken =
OperationContextImpl.getContext(ioExecutorFactory);
- repliToken.replicationLineUp();
-
- replicationStream.execute(() -> {
- if (enabled) {
- try {
- pendingTokens.add(repliToken);
- flowControl(syncFileMessage.expectedEncodeSize());
- if (syncFileMessage.getFileId() != -1 &&
syncFileMessage.getDataSize() > 0) {
- replicatingChannel.send(syncFileMessage,
syncFileMessage.getRaf(), syncFileMessage.getFileChannel(),
- syncFileMessage.getOffset(),
syncFileMessage.getDataSize(),
- lastChunk ? (Channel.Callback)
success -> syncFileMessage.release() : null);
- } else {
- replicatingChannel.send(syncFileMessage);
- }
- } catch (Exception e) {
- syncFileMessage.release();
- }
- } else {
- syncFileMessage.release();
- repliToken.replicationDone();
- }
- });
-
- return repliToken;
- }
-
/**
* This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on
replication lock. .
@@ -590,52 +560,49 @@ public final class ReplicationManager implements
ActiveMQComponent {
if (!file.isOpen()) {
file.open();
}
- final int size = 1024 * 1024;
- long fileSize = file.size();
+ int size = 32 * 1024;
int flowControlSize = 10;
int packetsSent = 0;
FlushAction action = new FlushAction();
- long offset = 0;
- RandomAccessFile raf = null;
- FileChannel fileChannel = null;
try {
- raf = new RandomAccessFile(file.getJavaFile(), "r");
- fileChannel = raf.getChannel();
- while (true) {
- long chunkSize = Math.min(size, fileSize - offset);
- int toSend = (int) chunkSize;
- if (chunkSize > 0) {
- if (chunkSize >= maxBytesToSend) {
- toSend = (int) maxBytesToSend;
- maxBytesToSend = 0;
- } else {
- maxBytesToSend = maxBytesToSend - chunkSize;
+ try (FileInputStream fis = new FileInputStream(file.getJavaFile());
FileChannel channel = fis.getChannel()) {
+
+ // We can afford having a single buffer here for this entire loop
+ // because sendReplicatePacket will encode the packet as a
NettyBuffer
+ // through ActiveMQBuffer class leaving this buffer free to be
reused on the next copy
+ while (true) {
+ final ByteBuf buffer =
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+ buffer.clear();
+ ByteBuffer byteBuffer =
buffer.writerIndex(size).readerIndex(0).nioBuffer();
+ final int bytesRead = channel.read(byteBuffer);
+ int toSend = bytesRead;
+ if (bytesRead > 0) {
+ if (bytesRead >= maxBytesToSend) {
+ toSend = (int) maxBytesToSend;
+ maxBytesToSend = 0;
+ } else {
+ maxBytesToSend = maxBytesToSend - bytesRead;
+ }
}
+ logger.debug("sending " + buffer.writerIndex() + " bytes on
file " + file.getFileName());
+ // sending -1 or 0 bytes will close the file at the backup
+ // We cannot simply send everything of a file through the
executor,
+ // otherwise we would run out of memory.
+ // so we don't use the executor here
+ sendReplicatePacket(new ReplicationSyncFileMessage(content,
pageStore, id, toSend, buffer), true);
+ packetsSent++;
+
+ if (packetsSent % flowControlSize == 0) {
+ flushReplicationStream(action);
+ }
+ if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+ break;
}
- logger.debug("sending " + toSend + " bytes on file " +
file.getFileName());
- // sending -1 or 0 bytes will close the file at the backup
- // We cannot simply send everything of a file through the executor,
- // otherwise we would run out of memory.
- // so we don't use the executor here
- sendSyncFileMessage(new ReplicationSyncFileMessage(content,
pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
- packetsSent++;
- offset += toSend;
-
- if (packetsSent % flowControlSize == 0) {
- flushReplicationStream(action);
- }
- if (toSend == 0 || maxBytesToSend == 0)
- break;
}
flushReplicationStream(action);
-
- } catch (Exception e) {
- if (raf != null)
- raf.close();
- throw e;
} finally {
if (file.isOpen())
file.close();
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
deleted file mode 100644
index f01e5e6..0000000
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-
-import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
-import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES;
-
-public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
- @Test
- public void testNettyConnectionEncodeMessage() throws Exception {
- int dataSize = 10;
- NettyConnection conn = new NettyConnection(new HashMap<>(), new
EmbeddedChannel(), null, false, false);
-
- SequentialFileFactory factory = new
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
- SequentialFile file = factory.createSequentialFile("file1.bin");
- file.open();
- RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
- FileChannel fileChannel = raf.getChannel();
- ReplicationSyncFileMessage replicationSyncFileMessage = new
ReplicationSyncFileMessage(MESSAGES,
-
null, 10, raf, fileChannel, 0, dataSize);
- RemotingConnectionImpl remotingConnection = new
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
- ActiveMQBuffer buffer =
replicationSyncFileMessage.encode(remotingConnection);
- Assert.assertEquals(buffer.getInt(0),
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
- Assert.assertEquals(buffer.capacity(),
replicationSyncFileMessage.expectedEncodeSize() - dataSize);
- file.close();
- }
-
-
- @Test
- public void testInVMConnectionEncodeMessage() throws Exception {
- int fileId = 10;
- InVMConnection conn = new InVMConnection(0, null, null, null);
-
- SequentialFileFactory factory = new
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
- SequentialFile file = factory.createSequentialFile("file1.bin");
- file.open();
- RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
- FileChannel fileChannel = raf.getChannel();
- ReplicationSyncFileMessage replicationSyncFileMessage = new
ReplicationSyncFileMessage(MESSAGES,
-
null, fileId, raf, fileChannel, 0, 0);
- RemotingConnectionImpl remotingConnection = new
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
- ActiveMQBuffer buffer =
replicationSyncFileMessage.encode(remotingConnection);
- Assert.assertEquals(buffer.readInt(),
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
- Assert.assertEquals(buffer.capacity(),
replicationSyncFileMessage.expectedEncodeSize());
-
- Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE);
-
- ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new
ReplicationSyncFileMessage();
- decodedReplicationSyncFileMessage.decode(buffer);
-
Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(),
MESSAGES);
- Assert.assertNull(decodedReplicationSyncFileMessage.getData());
- file.close();
- }
-}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c55764a..c7ed869 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.util;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -214,11 +212,6 @@ public class BackupSyncDelay implements Interceptor {
}
@Override
- public boolean send(Packet packet, RandomAccessFile raf, FileChannel
fileChannel, long offset, int dataSize, Callback callback) {
- return true;
- }
-
- @Override
public boolean sendBatched(Packet packet) {
throw new UnsupportedOperationException();
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index 23ae5f9..c9c975c 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -16,9 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -31,9 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -82,29 +77,6 @@ public class NettyConnectionTest extends ActiveMQTestBase {
}
- @Test
- public void testWritePacketAndFile() throws Exception {
- EmbeddedChannel channel = createChannel();
- NettyConnection conn = new NettyConnection(emptyMap, channel, new
MyListener(), false, false);
-
- final int size = 1234;
-
- ActiveMQBuffer buff = conn.createTransportBuffer(size);
- buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
- SequentialFileFactory factory = new
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
- SequentialFile file = factory.createSequentialFile("file1.bin");
- file.open();
- RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
- FileChannel fileChannel = raf.getChannel();
-
- conn.write(buff);
- conn.write(raf, fileChannel, 0, size, future -> raf.close());
- channel.runPendingTasks();
- Assert.assertEquals(2, channel.outboundMessages().size());
- Assert.assertFalse(fileChannel.isOpen());
- file.close();
- }
-
@Test(expected = IllegalStateException.class)
public void throwsExceptionOnBlockUntilWritableIfClosed() {
EmbeddedChannel channel = createChannel();