This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e57386870 [doc][tiered storage] read data from filesystem (#15239)
47e57386870 is described below

commit 47e5738687010c350b454ce384c3000ef0432bfb
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Apr 29 10:02:08 2022 +0800

    [doc][tiered storage] read data from filesystem (#15239)
---
 site2/docs/tiered-storage-filesystem.md | 63 ++++++++++++++++++++++++++++++++-
 1 file changed, 62 insertions(+), 1 deletion(-)

diff --git a/site2/docs/tiered-storage-filesystem.md 
b/site2/docs/tiered-storage-filesystem.md
index 2e96a48928d..b9dca2d1dad 100644
--- a/site2/docs/tiered-storage-filesystem.md
+++ b/site2/docs/tiered-storage-filesystem.md
@@ -520,4 +520,65 @@ Execute the following commands in the repository where you 
download Pulsar tarba
 
     And the **Capacity Used** is changed from 4 KB to 116.46 KB.
 
-    ![](assets/FileSystem-8.png)
\ No newline at end of file
+    ![](assets/FileSystem-8.png)
+
+## Read offloaded data from filesystem
+
+* The offloaded data is stored as `MapFile` in the following new path of the 
filesystem:
+  ```properties
+    path = storageBasePath + "/" + managedLedgerName + "/" + ledgerId + "-" + 
uuid.toString();
+  ```
+    * `storageBasePath` is the value of `hadoop.tmp.dir`, which is configured 
in `broker.conf` or `filesystem_offload_core_site.xml`.
+    * `managedLedgerName` is the ledger name of the persistentTopic manager.
+  ```shell
+     managedLedgerName of persistent://public/default/topics-name is 
public/default/persistent/topics-name.
+  ```
+  You can use the following method to get `managedLedgerName`:
+  ```shell
+     String managedLedgerName = 
TopicName.get("persistent://public/default/topics-name").getPersistenceNamingEncoding();
 
+  ```
+
+To read data out as ledger entries from the filesystem, complete the following 
steps.
+1. Create a reader to read both `MapFile` with a new path and the 
`configuration` of the filesystem.
+  ```shell
+     MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),  
configuration); 
+  ```
+2. Read the data as `LedgerEntry` from the filesystem.
+  ```java
+     LongWritable key = new LongWritable();
+     BytesWritable value = new BytesWritable();
+     key.set(nextExpectedId - 1);
+     reader.seek(key);
+     reader.next(key, value);
+     int length = value.getLength();
+     long entryId = key.get();
+     ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+     buf.writeBytes(value.copyBytes());
+     LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(ledgerId, entryId, 
length, buf);
+  ```
+3. Deserialize the `LedgerEntry` to `Message`.
+  ```java
+        ByteBuf metadataAndPayload = ledgerEntry.getDataBuffer();
+        long totalSize = metadataAndPayload.readableBytes();
+        BrokerEntryMetadata brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
+        MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+        
+        Map<String, String> properties = new TreeMap<>();
+        properties.put("X-Pulsar-batch-size", String.valueOf(totalSize
+                - metadata.getSerializedSize()));
+        properties.put("TOTAL-CHUNKS", 
Integer.toString(metadata.getNumChunksFromMsg()));
+        properties.put("CHUNK-ID", Integer.toString(metadata.getChunkId()));
+
+        // Decode if needed
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+        ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, 
metadata.getUncompressedSize());
+        // Copy into a heap buffer for output stream compatibility
+        ByteBuf data = 
PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
+                uncompressedPayload.readableBytes());
+        data.writeBytes(uncompressedPayload);
+        uncompressedPayload.release();
+  
+        MessageImpl message = new MessageImpl(topic, 
((PositionImpl)ledgerEntry.getPosition()).toString(), properties,
+                data, Schema.BYTES, metadata);
+        message.setBrokerEntryMetadata(brokerEntryMetadata);
+  ```
\ No newline at end of file

Reply via email to