This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 70c2200  ARTEMIS-2496 Revert catch up with zero-copy, as it's causing 
issues into some integration usage
     new d9e7025  This closes #2843
70c2200 is described below

commit 70c2200c54066eb6d9f1d0235aae5c3bbf5b7412
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 18 11:55:23 2019 -0400

    ARTEMIS-2496 Revert catch up with zero-copy, as it's causing issues into 
some integration usage
    
    Revert "ARTEMIS-2336 Use zero copy to replicate journal/page/large message 
file"
    
    This reverts commit 85b93f0883bc06a2dfe2de9d560805a59d626d38.
---
 .../artemis/core/protocol/core/Channel.java        |  20 ---
 .../core/protocol/core/impl/ChannelImpl.java       | 118 +++++---------
 .../core/protocol/core/impl/PacketImpl.java        |  11 +-
 .../core/remoting/impl/netty/NettyConnection.java  |  41 -----
 .../impl/netty/NonClosingDefaultFileRegion.java    |  38 -----
 .../artemis/spi/core/remoting/Connection.java      |   4 -
 .../core/protocol/core/impl/ChannelImplTest.java   |  11 --
 .../wireformat/ReplicationSyncFileMessage.java     | 170 +++++++++------------
 .../core/remoting/impl/invm/InVMConnection.java    |  24 ---
 .../core/replication/ReplicationManager.java       | 105 +++++--------
 .../wireformat/ReplicationSyncFileMessageTest.java |  85 -----------
 .../integration/cluster/util/BackupSyncDelay.java  |   7 -
 .../remoting/impl/netty/NettyConnectionTest.java   |  28 ----
 13 files changed, 149 insertions(+), 513 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index e541dad..56f8259 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.core;
 
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -69,20 +67,6 @@ public interface Channel {
    boolean send(Packet packet);
 
    /**
-    * Sends a packet and file on this channel.
-    *
-    * @param packet the packet to send
-    * @param raf the file to send
-    * @param fileChannel the file channel retrieved from raf
-    * @param offset the position of the raf
-    * @param dataSize the data size to send
-    * @param callback callback after send
-    * @return false if the packet was rejected by an outgoing interceptor; 
true if the send was
-    * successful
-    */
-   boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize, Callback callback);
-
-   /**
     * Sends a packet on this channel.
     *
     * @param packet the packet to send
@@ -263,8 +247,4 @@ public interface Channel {
     * @param transferring whether the channel is transferring
     */
    void setTransferring(boolean transferring);
-
-   interface Callback {
-      void done(boolean success);
-   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index d69b1e1..154ab8a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -27,7 +25,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -277,104 +274,67 @@ public final class ChannelImpl implements Channel {
       }
    }
 
-   private ActiveMQBuffer beforeSend(final Packet packet, final int 
reconnectID) {
-      packet.setChannelID(id);
-
-      if (responseAsyncCache != null && packet.isRequiresResponse() && 
packet.isResponseAsync()) {
-         packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
-      }
-
-      if (logger.isTraceEnabled()) {
-         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : 
connection.getID()) + " Sending packet nonblocking " + packet + " on 
channelID=" + id);
+   // This must never called by more than one thread concurrently
+   private boolean send(final Packet packet, final int reconnectID, final 
boolean flush, final boolean batch) {
+      if (invokeInterceptors(packet, interceptors, connection) != null) {
+         return false;
       }
 
-      ActiveMQBuffer buffer = packet.encode(connection);
-
-      lock.lock();
+      synchronized (sendLock) {
+         packet.setChannelID(id);
 
-      try {
-         if (failingOver) {
-            waitForFailOver("RemotingConnectionID=" + (connection == null ? 
"NULL" : connection.getID()) + " timed-out waiting for fail-over condition on 
non-blocking send");
+         if (responseAsyncCache != null && packet.isRequiresResponse() && 
packet.isResponseAsync()) {
+            packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
          }
 
-         // Sanity check
-         if (transferring) {
-            throw 
ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
+         if (logger.isTraceEnabled()) {
+            logger.trace("RemotingConnectionID=" + (connection == null ? 
"NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on 
channelID=" + id);
          }
 
-         if (resendCache != null && packet.isRequiresConfirmations()) {
-            addResendPacket(packet);
-         }
+         ActiveMQBuffer buffer = packet.encode(connection);
 
-      } finally {
-         lock.unlock();
-      }
+         lock.lock();
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : 
connection.getID()) + " Writing buffer for channelID=" + id);
-      }
+         try {
+            if (failingOver) {
+               waitForFailOver("RemotingConnectionID=" + (connection == null ? 
"NULL" : connection.getID()) + " timed-out waiting for fail-over condition on 
non-blocking send");
+            }
 
-      checkReconnectID(reconnectID);
+            // Sanity check
+            if (transferring) {
+               throw 
ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
+            }
 
-      //We do this outside the lock as ResponseCache is threadsafe and allows 
responses to come in,
-      //As the send could block if the response cache cannot add, preventing 
responses to be handled.
-      if (responseAsyncCache != null && packet.isRequiresResponse() && 
packet.isResponseAsync()) {
-         while (!responseAsyncCache.add(packet)) {
-            try {
-               Thread.sleep(1);
-            } catch (Exception e) {
-               // Ignore
+            if (resendCache != null && packet.isRequiresConfirmations()) {
+               addResendPacket(packet);
             }
-         }
-      }
 
-      return buffer;
-   }
+         } finally {
+            lock.unlock();
+         }
 
-   // This must never called by more than one thread concurrently
-   private boolean send(final Packet packet, final int reconnectID, final 
boolean flush, final boolean batch) {
-      if (invokeInterceptors(packet, interceptors, connection) != null) {
-         return false;
-      }
+         if (logger.isTraceEnabled()) {
+            logger.trace("RemotingConnectionID=" + (connection == null ? 
"NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
+         }
 
-      synchronized (sendLock) {
-         ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
+         checkReconnectID(reconnectID);
 
-         // The actual send must be outside the lock, or with OIO transport, 
the write can block if the tcp
-         // buffer is full, preventing any incoming buffers being handled and 
blocking failover
-         try {
-            connection.getTransportConnection().write(buffer, flush, batch);
-         } catch (Throwable t) {
-            //If runtime exception, we must remove from the cache to avoid 
filling up the cache causing it to be full.
-            //The client would get still know about this as the exception 
bubbles up the call stack instead.
-            if (responseAsyncCache != null && packet.isRequiresResponse() && 
packet.isResponseAsync()) {
-               responseAsyncCache.remove(packet.getCorrelationID());
+         //We do this outside the lock as ResponseCache is threadsafe and 
allows responses to come in,
+         //As the send could block if the response cache cannot add, 
preventing responses to be handled.
+         if (responseAsyncCache != null && packet.isRequiresResponse() && 
packet.isResponseAsync()) {
+            while (!responseAsyncCache.add(packet)) {
+               try {
+                  Thread.sleep(1);
+               } catch (Exception e) {
+                  // Ignore
+               }
             }
-            throw t;
          }
-         return true;
-      }
-   }
-
-   @Override
-   public boolean send(Packet packet,
-                       RandomAccessFile raf,
-                       FileChannel fileChannel,
-                       long offset,
-                       int dataSize,
-                       Callback callback) {
-      if (invokeInterceptors(packet, interceptors, connection) != null) {
-         return false;
-      }
-
-      synchronized (sendLock) {
-         ActiveMQBuffer buffer = beforeSend(packet, -1);
 
          // The actual send must be outside the lock, or with OIO transport, 
the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and 
blocking failover
          try {
-            connection.getTransportConnection().write(buffer);
-            connection.getTransportConnection().write(raf, fileChannel, 
offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> 
callback.done(future == null || future.isSuccess()));
+            connection.getTransportConnection().write(buffer, flush, batch);
          } catch (Throwable t) {
             //If runtime exception, we must remove from the cache to avoid 
filling up the cache causing it to be full.
             //The client would get still know about this as the exception 
bubbles up the call stack instead.
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index a7a3253..f8f85e8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -336,11 +336,7 @@ public class PacketImpl implements Packet {
    }
 
    protected void encodeSize(ActiveMQBuffer buffer) {
-      encodeSize(buffer, buffer.writerIndex());
-   }
-
-   protected void encodeSize(ActiveMQBuffer buffer, int size) {
-      this.size = size;
+      size = buffer.writerIndex();
 
       // The length doesn't include the actual length byte
       int len = size - DataConstants.SIZE_INT;
@@ -349,10 +345,9 @@ public class PacketImpl implements Packet {
    }
 
    protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
-      return createPacket(connection, expectedEncodeSize());
-   }
 
-   protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, 
int size) {
+      int size = expectedEncodeSize();
+
       if (connection == null) {
          return new ChannelBufferWrapper(Unpooled.buffer(size));
       } else {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 497448e..51330c7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.artemis.core.remoting.impl.netty;
 
-import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.SocketAddress;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -32,8 +29,6 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedFile;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -355,18 +350,6 @@ public class NettyConnection implements Connection {
       return canWrite;
    }
 
-   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, 
long offset, int dataSize) {
-      if (channel.pipeline().get(SslHandler.class) == null) {
-         return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
-      } else {
-         try {
-            return new ChunkedFile(raf, offset, dataSize, 8192);
-         } catch (IOException e) {
-            throw new RuntimeException(e);
-         }
-      }
-   }
-
    @Override
    public final void write(ActiveMQBuffer buffer,
                            final boolean flush,
@@ -407,30 +390,6 @@ public class NettyConnection implements Connection {
       }
    }
 
-   @Override
-   public void write(RandomAccessFile raf,
-                     FileChannel fileChannel,
-                     long offset,
-                     int dataSize,
-                     final ChannelFutureListener futureListener) {
-      final int readableBytes = dataSize;
-      if (logger.isDebugEnabled()) {
-         final int remainingBytes = this.writeBufferHighWaterMark - 
readableBytes;
-         if (remainingBytes < 0) {
-            logger.debug("a write request is exceeding by " + 
(-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + 
this.writeBufferHighWaterMark + " ] : consider to set it at least of " + 
readableBytes + " bytes");
-         }
-      }
-
-      //no need to lock because the Netty's channel is thread-safe
-      //and the order of write is ensured by the order of the write calls
-      final Channel channel = this.channel;
-      assert readableBytes >= 0;
-      ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, 
fileChannel, offset, dataSize));
-      if (futureListener != null) {
-         channelFuture.addListener(futureListener);
-      }
-   }
-
    private static void flushAndWait(final Channel channel, final 
ChannelPromise promise) {
       if (!channel.eventLoop().inEventLoop()) {
          waitFor(promise, DEFAULT_WAIT_MILLIS);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
deleted file mode 100644
index 4fc367f..0000000
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.remoting.impl.netty;
-
-import java.io.File;
-import java.nio.channels.FileChannel;
-
-import io.netty.channel.DefaultFileRegion;
-
-public class NonClosingDefaultFileRegion extends DefaultFileRegion {
-
-   public NonClosingDefaultFileRegion(FileChannel file, long position, long 
count) {
-      super(file, position, count);
-   }
-
-   public NonClosingDefaultFileRegion(File f, long position, long count) {
-      super(f, position, count);
-   }
-
-   @Override
-   protected void deallocate() {
-      // Overridden to avoid closing the file
-   }
-}
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index fe5d395..ebde456 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.spi.core.remoting;
 
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.ChannelFutureListener;
@@ -103,8 +101,6 @@ public interface Connection {
     */
    void write(ActiveMQBuffer buffer);
 
-   void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int 
dataSize, ChannelFutureListener futureListener);
-
    /**
     * This should close the internal channel without calling any listeners.
     * This is to avoid a situation where the broker is busy writing on an 
internal thread.
diff --git 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 4a4ca39..7d3fb23 100644
--- 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -17,8 +17,6 @@
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import javax.security.auth.Subject;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -395,15 +393,6 @@ public class ChannelImplTest {
             }
 
             @Override
-            public void write(RandomAccessFile raf,
-                              FileChannel fileChannel,
-                              long offset,
-                              int dataSize,
-                              ChannelFutureListener channelFutureListener) {
-
-            }
-
-            @Override
             public void forceClose() {
 
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 5a30c64..b81782b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,30 +16,22 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.Objects;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
-import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
-import org.jboss.logging.Logger;
 
 /**
  * Message is used to sync {@link 
org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The 
{@link FileType} controls
  * which extra information is sent.
  */
 public final class ReplicationSyncFileMessage extends PacketImpl {
-   private static final Logger logger = 
Logger.getLogger(ReplicationSyncFileMessage.class);
 
    /**
     * The JournalType or {@code null} if sync'ing large-messages.
@@ -51,12 +43,10 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
     */
    private long fileId;
    private int dataSize;
+   private ByteBuf byteBuffer;
    private byte[] byteArray;
    private SimpleString pageStoreName;
    private FileType fileType;
-   private RandomAccessFile raf;
-   private FileChannel fileChannel;
-   private long offset;
 
    public enum FileType {
       JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
@@ -88,18 +78,14 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
    public 
ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
                                      SimpleString storeName,
                                      long id,
-                                     RandomAccessFile raf,
-                                     FileChannel fileChannel,
-                                     long offset,
-                                     int size) {
+                                     int size,
+                                     ByteBuf buffer) {
       this();
+      this.byteBuffer = buffer;
       this.pageStoreName = storeName;
       this.dataSize = size;
       this.fileId = id;
-      this.raf = raf;
-      this.fileChannel = fileChannel;
       this.journalType = content;
-      this.offset = offset;
       determineType();
    }
 
@@ -113,30 +99,10 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
       }
    }
 
-   public long getFileId() {
-      return fileId;
-   }
-
-   public int getDataSize() {
-      return dataSize;
-   }
-
-   public RandomAccessFile getRaf() {
-      return raf;
-   }
-
-   public FileChannel getFileChannel() {
-      return fileChannel;
-   }
-
-   public long getOffset() {
-      return offset;
-   }
-
    @Override
    public int expectedEncodeSize() {
       int size = PACKET_HEADERS_SIZE +
-         DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+                 DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
 
       if (fileId == -1)
          return size;
@@ -159,7 +125,7 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
       size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
 
       if (dataSize > 0) {
-         size += dataSize;
+         size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, 
byteBuffer.writerIndex());
       }
 
       return size;
@@ -184,55 +150,30 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
          default:
             // no-op
       }
-      buffer.writeInt(dataSize);
-   }
 
-   @Override
-   public ActiveMQBuffer encode(CoreRemotingConnection connection) {
-      if (fileId != -1 && dataSize > 0) {
-         ActiveMQBuffer buffer;
-         int bufferSize = expectedEncodeSize();
-         int encodedSize = bufferSize;
-         boolean isNetty = false;
-         if (connection != null && connection.getTransportConnection() 
instanceof NettyConnection) {
-            bufferSize -= dataSize;
-            isNetty = true;
-         }
-         buffer = createPacket(connection, bufferSize);
-         encodeHeader(buffer);
-         encodeRest(buffer, connection);
-         if (!isNetty) {
-            ByteBuffer byteBuffer;
-            if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() 
== 1) {
-               byteBuffer = 
buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), 
buffer.writableBytes());
-            } else {
-               byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), 
buffer.writableBytes());
-            }
-            readFile(byteBuffer);
-            buffer.writerIndex(buffer.capacity());
-         }
-         encodeSize(buffer, encodedSize);
-         return buffer;
-      } else {
-         return super.encode(connection);
+      buffer.writeInt(dataSize);
+      /*
+       * sending -1 will close the file in case of a journal, but not in case 
of a largeMessage
+       * (which might receive appends)
+       */
+      if (dataSize > 0) {
+         buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
       }
+
+      release();
    }
 
    @Override
    public void release() {
-      if (raf != null) {
-         try {
-            raf.close();
-         } catch (IOException e) {
-            logger.error("Close file " + this + " failed", e);
-         }
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
       }
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       fileId = buffer.readLong();
-      if (fileId == -1) return;
       switch (FileType.getFileType(buffer.readByte())) {
          case JOURNAL: {
             journalType = 
AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
@@ -256,14 +197,6 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
       }
    }
 
-   private void readFile(ByteBuffer buffer) {
-      try {
-         fileChannel.read(buffer, offset);
-      } catch (IOException e) {
-         throw new RuntimeException(e);
-      }
-   }
-
    public long getId() {
       return fileId;
    }
@@ -285,22 +218,61 @@ public final class ReplicationSyncFileMessage extends 
PacketImpl {
    }
 
    @Override
-   public boolean equals(Object o) {
-      if (this == o)
-         return true;
-      if (o == null || getClass() != o.getClass())
-         return false;
-      if (!super.equals(o))
-         return false;
-      ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o;
-      return fileId == that.fileId && dataSize == that.dataSize && offset == 
that.offset && journalType == that.journalType && Arrays.equals(byteArray, 
that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && 
fileType == that.fileType && Objects.equals(raf, that.raf) && 
Objects.equals(fileChannel, that.fileChannel);
+   public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + Arrays.hashCode(byteArray);
+      result = prime * result + ((byteBuffer == null) ? 0 : 
byteBuffer.hashCode());
+      result = prime * result + dataSize;
+      result = prime * result + (int) (fileId ^ (fileId >>> 32));
+      result = prime * result + ((fileType == null) ? 0 : fileType.hashCode());
+      result = prime * result + ((journalType == null) ? 0 : 
journalType.hashCode());
+      result = prime * result + ((pageStoreName == null) ? 0 : 
pageStoreName.hashCode());
+      return result;
    }
 
    @Override
-   public int hashCode() {
-      int result = Objects.hash(super.hashCode(), journalType, fileId, 
dataSize, pageStoreName, fileType, raf, fileChannel, offset);
-      result = 31 * result + Arrays.hashCode(byteArray);
-      return result;
+   public boolean equals(Object obj) {
+      if (this == obj) {
+         return true;
+      }
+      if (!super.equals(obj)) {
+         return false;
+      }
+      if (!(obj instanceof ReplicationSyncFileMessage)) {
+         return false;
+      }
+      ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj;
+      if (!Arrays.equals(byteArray, other.byteArray)) {
+         return false;
+      }
+      if (byteBuffer == null) {
+         if (other.byteBuffer != null) {
+            return false;
+         }
+      } else if (!byteBuffer.equals(other.byteBuffer)) {
+         return false;
+      }
+      if (dataSize != other.dataSize) {
+         return false;
+      }
+      if (fileId != other.fileId) {
+         return false;
+      }
+      if (fileType != other.fileType) {
+         return false;
+      }
+      if (journalType != other.journalType) {
+         return false;
+      }
+      if (pageStoreName == null) {
+         if (other.pageStoreName != null) {
+            return false;
+         }
+      } else if (!pageStoreName.equals(other.pageStoreName)) {
+         return false;
+      }
+      return true;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 02f1c84..b2fc576 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.core.remoting.impl.invm;
 
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -245,28 +243,6 @@ public class InVMConnection implements Connection {
    }
 
    @Override
-   public void write(RandomAccessFile raf,
-                     FileChannel fileChannel,
-                     long offset,
-                     int dataSize,
-                     final ChannelFutureListener futureListener) {
-      if (futureListener == null) {
-         return;
-      }
-      try {
-         executor.execute(() -> {
-            try {
-               futureListener.operationComplete(null);
-            } catch (Exception e) {
-               throw new IllegalStateException(e);
-            }
-         });
-      } catch (RejectedExecutionException e) {
-
-      }
-   }
-
-   @Override
    public String getRemoteAddress() {
       return "invm:" + serverID;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d48a5a0..1d1217d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -16,7 +16,8 @@
  */
 package org.apache.activemq.artemis.core.replication;
 
-import java.io.RandomAccessFile;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
@@ -27,6 +28,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -389,39 +392,6 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       return repliToken;
    }
 
-   private OperationContext sendSyncFileMessage(final 
ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
-      if (!enabled) {
-         syncFileMessage.release();
-         return null;
-      }
-
-      final OperationContext repliToken = 
OperationContextImpl.getContext(ioExecutorFactory);
-      repliToken.replicationLineUp();
-
-      replicationStream.execute(() -> {
-         if (enabled) {
-            try {
-               pendingTokens.add(repliToken);
-               flowControl(syncFileMessage.expectedEncodeSize());
-               if (syncFileMessage.getFileId() != -1 && 
syncFileMessage.getDataSize() > 0) {
-                  replicatingChannel.send(syncFileMessage, 
syncFileMessage.getRaf(), syncFileMessage.getFileChannel(),
-                                          syncFileMessage.getOffset(), 
syncFileMessage.getDataSize(),
-                                          lastChunk ? (Channel.Callback) 
success -> syncFileMessage.release() : null);
-               } else {
-                  replicatingChannel.send(syncFileMessage);
-               }
-            } catch (Exception e) {
-               syncFileMessage.release();
-            }
-         } else {
-            syncFileMessage.release();
-            repliToken.replicationDone();
-         }
-      });
-
-      return repliToken;
-   }
-
    /**
     * This was written as a refactoring of sendReplicatePacket.
     * In case you refactor this in any way, this method must hold a lock on 
replication lock. .
@@ -590,52 +560,49 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       if (!file.isOpen()) {
          file.open();
       }
-      final int size = 1024 * 1024;
-      long fileSize = file.size();
+      int size = 32 * 1024;
 
       int flowControlSize = 10;
 
       int packetsSent = 0;
       FlushAction action = new FlushAction();
 
-      long offset = 0;
-      RandomAccessFile raf = null;
-      FileChannel fileChannel = null;
       try {
-         raf = new RandomAccessFile(file.getJavaFile(), "r");
-         fileChannel = raf.getChannel();
-         while (true) {
-            long chunkSize = Math.min(size, fileSize - offset);
-            int toSend = (int) chunkSize;
-            if (chunkSize > 0) {
-               if (chunkSize >= maxBytesToSend) {
-                  toSend = (int) maxBytesToSend;
-                  maxBytesToSend = 0;
-               } else {
-                  maxBytesToSend = maxBytesToSend - chunkSize;
+         try (FileInputStream fis = new FileInputStream(file.getJavaFile()); 
FileChannel channel = fis.getChannel()) {
+
+            // We can afford having a single buffer here for this entire loop
+            // because sendReplicatePacket will encode the packet as a 
NettyBuffer
+            // through ActiveMQBuffer class leaving this buffer free to be 
reused on the next copy
+            while (true) {
+               final ByteBuf buffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+               buffer.clear();
+               ByteBuffer byteBuffer = 
buffer.writerIndex(size).readerIndex(0).nioBuffer();
+               final int bytesRead = channel.read(byteBuffer);
+               int toSend = bytesRead;
+               if (bytesRead > 0) {
+                  if (bytesRead >= maxBytesToSend) {
+                     toSend = (int) maxBytesToSend;
+                     maxBytesToSend = 0;
+                  } else {
+                     maxBytesToSend = maxBytesToSend - bytesRead;
+                  }
                }
+               logger.debug("sending " + buffer.writerIndex() + " bytes on 
file " + file.getFileName());
+               // sending -1 or 0 bytes will close the file at the backup
+               // We cannot simply send everything of a file through the 
executor,
+               // otherwise we would run out of memory.
+               // so we don't use the executor here
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, 
pageStore, id, toSend, buffer), true);
+               packetsSent++;
+
+               if (packetsSent % flowControlSize == 0) {
+                  flushReplicationStream(action);
+               }
+               if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+                  break;
             }
-            logger.debug("sending " + toSend + " bytes on file " + 
file.getFileName());
-            // sending -1 or 0 bytes will close the file at the backup
-            // We cannot simply send everything of a file through the executor,
-            // otherwise we would run out of memory.
-            // so we don't use the executor here
-            sendSyncFileMessage(new ReplicationSyncFileMessage(content, 
pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
-            packetsSent++;
-            offset += toSend;
-
-            if (packetsSent % flowControlSize == 0) {
-               flushReplicationStream(action);
-            }
-            if (toSend == 0 || maxBytesToSend == 0)
-               break;
          }
          flushReplicationStream(action);
-
-      } catch (Exception e) {
-         if (raf != null)
-            raf.close();
-         throw e;
       } finally {
          if (file.isOpen())
             file.close();
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
deleted file mode 100644
index f01e5e6..0000000
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-
-import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
-import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES;
-
-public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
-   @Test
-   public void testNettyConnectionEncodeMessage() throws Exception {
-      int dataSize = 10;
-      NettyConnection conn = new NettyConnection(new HashMap<>(), new 
EmbeddedChannel(), null, false, false);
-
-      SequentialFileFactory factory = new 
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
-      SequentialFile file = factory.createSequentialFile("file1.bin");
-      file.open();
-      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
-      FileChannel fileChannel = raf.getChannel();
-      ReplicationSyncFileMessage replicationSyncFileMessage = new 
ReplicationSyncFileMessage(MESSAGES,
-                                                                               
              null, 10, raf, fileChannel, 0, dataSize);
-      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
-      ActiveMQBuffer buffer = 
replicationSyncFileMessage.encode(remotingConnection);
-      Assert.assertEquals(buffer.getInt(0), 
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
-      Assert.assertEquals(buffer.capacity(), 
replicationSyncFileMessage.expectedEncodeSize() - dataSize);
-      file.close();
-   }
-
-
-   @Test
-   public void testInVMConnectionEncodeMessage() throws Exception {
-      int fileId = 10;
-      InVMConnection conn = new InVMConnection(0, null, null, null);
-
-      SequentialFileFactory factory = new 
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
-      SequentialFile file = factory.createSequentialFile("file1.bin");
-      file.open();
-      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
-      FileChannel fileChannel = raf.getChannel();
-      ReplicationSyncFileMessage replicationSyncFileMessage = new 
ReplicationSyncFileMessage(MESSAGES,
-                                                                               
              null, fileId, raf, fileChannel, 0, 0);
-      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(null, conn, 10, 10, null, null, null);
-      ActiveMQBuffer buffer = 
replicationSyncFileMessage.encode(remotingConnection);
-      Assert.assertEquals(buffer.readInt(), 
replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
-      Assert.assertEquals(buffer.capacity(), 
replicationSyncFileMessage.expectedEncodeSize());
-
-      Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE);
-
-      ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new 
ReplicationSyncFileMessage();
-      decodedReplicationSyncFileMessage.decode(buffer);
-      
Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), 
MESSAGES);
-      Assert.assertNull(decodedReplicationSyncFileMessage.getData());
-      file.close();
-   }
-}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c55764a..c7ed869 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.util;
 
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -214,11 +212,6 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
-      public boolean send(Packet packet, RandomAccessFile raf, FileChannel 
fileChannel, long offset, int dataSize, Callback callback) {
-         return true;
-      }
-
-      @Override
       public boolean sendBatched(Packet packet) {
          throw new UnsupportedOperationException();
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index 23ae5f9..c9c975c 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,9 +29,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -82,29 +77,6 @@ public class NettyConnectionTest extends ActiveMQTestBase {
 
    }
 
-   @Test
-   public void testWritePacketAndFile() throws Exception {
-      EmbeddedChannel channel = createChannel();
-      NettyConnection conn = new NettyConnection(emptyMap, channel, new 
MyListener(), false, false);
-
-      final int size = 1234;
-
-      ActiveMQBuffer buff = conn.createTransportBuffer(size);
-      buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
-      SequentialFileFactory factory = new 
NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
-      SequentialFile file = factory.createSequentialFile("file1.bin");
-      file.open();
-      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
-      FileChannel fileChannel = raf.getChannel();
-
-      conn.write(buff);
-      conn.write(raf, fileChannel, 0, size, future -> raf.close());
-      channel.runPendingTasks();
-      Assert.assertEquals(2, channel.outboundMessages().size());
-      Assert.assertFalse(fileChannel.isOpen());
-      file.close();
-   }
-
    @Test(expected = IllegalStateException.class)
    public void throwsExceptionOnBlockUntilWritableIfClosed() {
       EmbeddedChannel channel = createChannel();

Reply via email to