Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.0.x d9fbb13ee -> cf4987a3e


QPID-7844: Recover metadata into direct memory [6.0.x]


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cf4987a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cf4987a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cf4987a3

Branch: refs/heads/6.0.x
Commit: cf4987a3ea641c3d5a76e04c92fd3c6b57c65c6b
Parents: d9fbb13
Author: Keith Wall <[email protected]>
Authored: Tue Jun 20 15:02:34 2017 +0100
Committer: Keith Wall <[email protected]>
Committed: Fri Jun 30 14:32:11 2017 +0100

----------------------------------------------------------------------
 .../tuple/MessageMetaDataBinding.java           | 24 +++++++-----
 .../server/store/AbstractJDBCMessageStore.java  | 39 +++++++++++++-------
 2 files changed, 40 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf4987a3/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
----------------------------------------------------------------------
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
index e4e36b0..f9e8fc1 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
@@ -22,11 +22,6 @@ package org.apache.qpid.server.store.berkeleydb.tuple;
 
 import com.sleepycat.bind.EntryBinding;
 import com.sleepycat.je.DatabaseEntry;
-import java.nio.ByteBuffer;
-
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,13 +50,24 @@ public class MessageMetaDataBinding implements 
EntryBinding<StorableMessageMetaD
     @Override
     public StorableMessageMetaData entryToObject(DatabaseEntry entry)
     {
-        QpidByteBuffer buf = QpidByteBuffer.wrap(entry.getData(), 
entry.getOffset(), entry.getSize());
+        QpidByteBuffer buf;
+        if (entry.getSize() > QpidByteBuffer.getPooledBufferSize())
+        {
+            buf = QpidByteBuffer.wrap(entry.getData(), entry.getOffset(), 
entry.getSize());
+        }
+        else
+        {
+            buf = QpidByteBuffer.allocateDirect(entry.getSize());
+            buf.put(entry.getData(), entry.getOffset(), entry.getSize());
+            buf.flip();
+        }
         final int bodySize = buf.getInt() ^ 0x80000000;
         final int metaDataType = buf.get() & 0xff;
-        buf = buf.slice();
-        buf.limit(bodySize-1);
+        QpidByteBuffer slice = buf.slice();
+        slice.limit(bodySize-1);
         MessageMetaDataType type = 
MessageMetaDataTypeRegistry.fromOrdinal(metaDataType);
-        final StorableMessageMetaData metaData = type.createMetaData(buf);
+        final StorableMessageMetaData metaData = type.createMetaData(slice);
+        slice.dispose();
         buf.dispose();
         return metaData;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf4987a3/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e38be2b..644053c 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -996,13 +996,7 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
                     if(rs.next())
                     {
                         byte[] dataAsBytes = getBlobAsBytes(rs, 1);
-                        QpidByteBuffer buf = QpidByteBuffer.wrap(dataAsBytes);
-                        buf.position(1);
-                        buf = buf.slice();
-                        MessageMetaDataType type = 
MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                        StorableMessageMetaData metaData = 
type.createMetaData(buf);
-                        buf.dispose();
-                        return metaData;
+                        return getStorableMessageMetaData(dataAsBytes);
                     }
                     else
                     {
@@ -1726,14 +1720,8 @@ public abstract class AbstractJDBCMessageStore 
implements MessageStore
                         if (rs.next())
                         {
                             byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-                            QpidByteBuffer buf = 
QpidByteBuffer.wrap(dataAsBytes);
-                            buf.position(1);
-                            buf = buf.slice();
-                            MessageMetaDataType<?> type = 
MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
-                            StorableMessageMetaData metaData = 
type.createMetaData(buf);
-                            buf.dispose();
+                            StorableMessageMetaData metaData = 
getStorableMessageMetaData(dataAsBytes);
                             message = createStoredJDBCMessage(messageId, 
metaData, true);
-
                         }
                         else
                         {
@@ -2006,6 +1994,29 @@ public abstract class AbstractJDBCMessageStore 
implements MessageStore
         }
     }
 
+    private StorableMessageMetaData getStorableMessageMetaData(final byte[] 
dataAsBytes)
+    {
+        final QpidByteBuffer buf;
+        if (dataAsBytes.length > QpidByteBuffer.getPooledBufferSize())
+        {
+            buf =  QpidByteBuffer.wrap(dataAsBytes);
+        }
+        else
+        {
+            buf = QpidByteBuffer.allocateDirect(dataAsBytes.length);
+            buf.put(dataAsBytes);
+            buf.flip();
+        }
+        buf.position(1);
+        QpidByteBuffer slice = buf.slice();
+        int typeOrdinal = dataAsBytes[0] & 0xff;
+        MessageMetaDataType type = 
MessageMetaDataTypeRegistry.fromOrdinal(typeOrdinal);
+        StorableMessageMetaData metaData = type.createMetaData(slice);
+        slice.dispose();
+        buf.dispose();
+        return metaData;
+    }
+
     protected abstract void storedSizeChange(int storeSizeIncrease);
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to