This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 48e1131  In S3 offloader, dont use InputStream#available for stream 
length (#1807)
48e1131 is described below

commit 48e11316170138da7d8bd511356bfa468f4f0426
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue May 22 18:01:02 2018 +0100

    In S3 offloader, dont use InputStream#available for stream length (#1807)
    
    According to the javadoc, available() returns the number bytes that
    can be read without blocking. It is _not_ the total length of the
    stream. While this currently works for the index as it's backed by a
    byte buffer, we shouldn't rely on implicit assumptions like that.
    
    Master issue: #1511
---
 .../pulsar/broker/s3offload/OffloadIndexBlock.java | 26 +++++++++++++++++++---
 .../broker/s3offload/S3ManagedLedgerOffloader.java |  4 ++--
 .../s3offload/impl/OffloadIndexBlockImpl.java      | 12 +++++-----
 3 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index 944edca..c7e71c7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -19,8 +19,9 @@
 package org.apache.pulsar.broker.s3offload;
 
 import java.io.Closeable;
-import java.io.IOException;
+import java.io.FilterInputStream;
 import java.io.InputStream;
+import java.io.IOException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 
@@ -38,7 +39,7 @@ public interface OffloadIndexBlock extends Closeable {
      *   | index_magic_header | index_block_len | index_entry_count |
      *   | data_object_size | segment_metadata_length | segment metadata | 
index entries ... |
      */
-    InputStream toStream() throws IOException;
+    IndexInputStream toStream() throws IOException;
 
     /**
      * Get the related OffloadIndexEntry that contains the given 
messageEntryId.
@@ -59,9 +60,28 @@ public interface OffloadIndexBlock extends Closeable {
      */
     LedgerMetadata getLedgerMetadata();
 
-    /**
+    /*
      * Get the total size of the data object.
      */
     long getDataObjectLength();
+
+    /**
+     * An input stream which knows the size of the stream upfront.
+     */
+    public static class IndexInputStream extends FilterInputStream {
+        final long streamSize;
+
+        public IndexInputStream(InputStream in, long streamSize) {
+            super(in);
+            this.streamSize = streamSize;
+        }
+
+        /**
+         * @return the number of bytes in the stream.
+         */
+        public long getStreamSize() {
+            return streamSize;
+        }
+    }
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index ee82532..9cb1486 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -180,10 +180,10 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
 
             // upload index block
             try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
-                 InputStream indexStream = index.toStream()) {
+                 OffloadIndexBlock.IndexInputStream indexStream = 
index.toStream()) {
                 // write the index block
                 ObjectMetadata metadata = new ObjectMetadata();
-                metadata.setContentLength(indexStream.available());
+                metadata.setContentLength(indexStream.getStreamSize());
                 s3client.putObject(new PutObjectRequest(
                     bucket,
                     indexBlockKey,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index 638edc4..3c5f337 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -28,6 +28,7 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.FilterInputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -158,15 +159,12 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
      *   |segment_metadata_len | segment metadata | index entries |
      */
     @Override
-    public InputStream toStream() throws IOException {
-        int indexBlockLength;
-        int segmentMetadataLength;
+    public OffloadIndexBlock.IndexInputStream toStream() throws IOException {
         int indexEntryCount = this.indexEntries.size();
-
         byte[] ledgerMetadataByte = 
buildLedgerMetadataFormat(this.segmentMetadata);
-        segmentMetadataLength = ledgerMetadataByte.length;
+        int segmentMetadataLength = ledgerMetadataByte.length;
 
-        indexBlockLength = 4 /* magic header */
+        int indexBlockLength = 4 /* magic header */
             + 4 /* index block length */
             + 8 /* data object length */
             + 4 /* segment metadata length */
@@ -191,7 +189,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
                 .writeInt(entry.getValue().getPartId())
                 .writeLong(entry.getValue().getOffset()));
 
-        return new ByteBufInputStream(out, true);
+        return new OffloadIndexBlock.IndexInputStream(new 
ByteBufInputStream(out, true), indexBlockLength);
     }
 
     static private class InternalLedgerMetadata implements LedgerMetadata {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to