315157973 commented on code in PR #19561:
URL: https://github.com/apache/pulsar/pull/19561#discussion_r1113018463


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -415,29 +400,43 @@ private static MetaStoreException getException(Throwable 
t) {
     }
 
     public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
-        if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
+        CompressionType compressionType = 
ledgerInfoCompressionConfig.getCompressionType();
+        if (compressionType.equals(CompressionType.NONE)) {
             return managedLedgerInfo.toByteArray();
         }
-        MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = 
MLDataFormats.ManagedLedgerInfoMetadata
-                .newBuilder()
-                .setCompressionType(ledgerInfoCompressionType)
-                .setUncompressedSize(managedLedgerInfo.getSerializedSize())
-                .build();
-        return compressManagedInfo(managedLedgerInfo.toByteArray(), 
mlInfoMetadata.toByteArray(),
-                mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
+
+        int uncompressedSize = managedLedgerInfo.getSerializedSize();
+        if (uncompressedSize > 
ledgerInfoCompressionConfig.getCompressThreshold()) {
+            MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = 
MLDataFormats.ManagedLedgerInfoMetadata
+                    .newBuilder()
+                    .setCompressionType(compressionType)
+                    .setUncompressedSize(uncompressedSize)
+                    .build();
+            return compressManagedInfo(managedLedgerInfo.toByteArray(), 
mlInfoMetadata.toByteArray(),
+                    mlInfoMetadata.getSerializedSize(), compressionType);
+        }
+
+        return managedLedgerInfo.toByteArray();
     }
 
     public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
-        if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
+        CompressionType compressionType = 
cursorInfoCompressionConfig.getCompressionType();
+        if (compressionType.equals(CompressionType.NONE)) {
             return managedCursorInfo.toByteArray();
         }
-        MLDataFormats.ManagedCursorInfoMetadata metadata = 
MLDataFormats.ManagedCursorInfoMetadata
-                .newBuilder()
-                .setCompressionType(cursorInfoCompressionType)
-                .setUncompressedSize(managedCursorInfo.getSerializedSize())
-                .build();
-        return compressManagedInfo(managedCursorInfo.toByteArray(), 
metadata.toByteArray(),
-                metadata.getSerializedSize(), cursorInfoCompressionType);
+
+        int uncompressedSize = managedCursorInfo.getSerializedSize();
+        if (uncompressedSize > 
cursorInfoCompressionConfig.getCompressThreshold()) {
+            MLDataFormats.ManagedCursorInfoMetadata metadata = 
MLDataFormats.ManagedCursorInfoMetadata
+                    .newBuilder()
+                    .setCompressionType(compressionType)
+                    .setUncompressedSize(uncompressedSize)
+                    .build();
+            return compressManagedInfo(managedCursorInfo.toByteArray(), 
metadata.toByteArray(),
+                    metadata.getSerializedSize(), compressionType);
+        }
+
+        return managedCursorInfo.toByteArray();

Review Comment:
   What if it is uncompressed data when decompressing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to