This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 514b6af  Avoid prefetch too much data when offloading data to HDFS 
(#6717)
514b6af is described below

commit 514b6af7586633424739cfc3c6131b0d0afec9e4
Author: pheecian <[email protected]>
AuthorDate: Wed Apr 22 17:06:22 2020 +0800

    Avoid prefetch too much data when offloading data to HDFS (#6717)
    
    Fixes #6692
    
    ### Motivation
    avoid prefetch too much data when offloading, which may lead to OOM;
    fix object not close issue, which is also mentioned by congbobo184 
https://github.com/apache/pulsar/pull/6697
    
    *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve.*
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
---
 conf/broker.conf                                   |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../common/policies/data/OffloadPolicies.java      |  5 ++
 .../impl/FileSystemManagedLedgerOffloader.java     | 73 +++++++++++++++++-----
 4 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2ac91bd..8aebf10 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -889,6 +889,9 @@ managedLedgerOffloadDriver=
 # Maximum number of thread pool threads for ledger offloading
 managedLedgerOffloadMaxThreads=2
 
+# Maximum prefetch rounds for ledger reading for offloading
+managedLedgerOffloadPrefetchRounds=1
+
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 96ba8f2..030e838 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1536,6 +1536,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int managedLedgerOffloadMaxThreads = 2;
 
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "Maximum prefetch rounds for ledger reading for offloading"
+    )
+    private int managedLedgerOffloadPrefetchRounds = 1;
+
     /**** --- Transaction config variables --- ****/
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 5ccb75c..4936923 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -37,6 +37,7 @@ public class OffloadPolicies {
     public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+    public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
     public final static String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage", "filesystem"};
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
@@ -46,6 +47,7 @@ public class OffloadPolicies {
     private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
+    private int managedLedgerOffloadPrefetchRounds = 
DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
     private long managedLedgerOffloadThresholdInBytes = 
DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
     private Long managedLedgerOffloadDeletionLagInMillis = 
DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
 
@@ -161,6 +163,7 @@ public class OffloadPolicies {
         return Objects.hash(
                 managedLedgerOffloadDriver,
                 managedLedgerOffloadMaxThreads,
+                managedLedgerOffloadPrefetchRounds,
                 managedLedgerOffloadThresholdInBytes,
                 managedLedgerOffloadDeletionLagInMillis,
                 s3ManagedLedgerOffloadRegion,
@@ -190,6 +193,7 @@ public class OffloadPolicies {
         OffloadPolicies other = (OffloadPolicies) obj;
         return Objects.equals(managedLedgerOffloadDriver, 
other.getManagedLedgerOffloadDriver())
                 && Objects.equals(managedLedgerOffloadMaxThreads, 
other.getManagedLedgerOffloadMaxThreads())
+                && Objects.equals(managedLedgerOffloadPrefetchRounds, 
other.getManagedLedgerOffloadPrefetchRounds())
                 && Objects.equals(managedLedgerOffloadThresholdInBytes,
                     other.getManagedLedgerOffloadThresholdInBytes())
                 && Objects.equals(managedLedgerOffloadDeletionLagInMillis,
@@ -222,6 +226,7 @@ public class OffloadPolicies {
         return MoreObjects.toStringHelper(this)
                 .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
                 .add("managedLedgerOffloadMaxThreads", 
managedLedgerOffloadMaxThreads)
+                .add("managedLedgerOffloadPrefetchRounds", 
managedLedgerOffloadPrefetchRounds)
                 .add("managedLedgerOffloadThresholdInBytes", 
managedLedgerOffloadThresholdInBytes)
                 .add("managedLedgerOffloadDeletionLagInMillis", 
managedLedgerOffloadDeletionLagInMillis)
                 .add("s3ManagedLedgerOffloadRegion", 
s3ManagedLedgerOffloadRegion)
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index bbee828..5438459 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import io.netty.util.Recycler;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -45,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
@@ -68,6 +70,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
     public static boolean driverSupported(String driver) {
         return DRIVER_NAMES.equals(driver);
     }
+
     @Override
     public String getOffloadDriverName() {
         return driverName;
@@ -82,7 +85,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
         this.configuration = new Configuration();
         if (conf.getFileSystemProfilePath() != null) {
             String[] paths = conf.getFileSystemProfilePath().split(",");
-            for (int i =0 ; i < paths.length; i++) {
+            for (int i = 0; i < paths.length; i++) {
                 configuration.addResource(new Path(paths[i]));
             }
         }
@@ -106,6 +109,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
     }
+
     @VisibleForTesting
     public FileSystemManagedLedgerOffloader(OffloadPolicies conf, 
OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws 
IOException {
         this.offloadPolicies = conf;
@@ -137,7 +141,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
     @Override
     public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, 
Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.chooseThread(readHandle.getId()).submit(new 
LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, 
configuration, assignmentScheduler));
+        scheduler.chooseThread(readHandle.getId()).submit(new 
LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, 
configuration, assignmentScheduler, 
offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
         return promise;
     }
 
@@ -151,9 +155,10 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
         private final Configuration configuration;
         volatile Exception fileSystemWriteException = null;
         private OrderedScheduler assignmentScheduler;
+        private int managedLedgerOffloadPrefetchRounds = 1;
 
         private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, 
String> extraMetadata, CompletableFuture<Void> promise,
-                             String storageBasePath, Configuration 
configuration, OrderedScheduler assignmentScheduler) {
+                             String storageBasePath, Configuration 
configuration, OrderedScheduler assignmentScheduler, int 
managedLedgerOffloadPrefetchRounds) {
             this.readHandle = readHandle;
             this.uuid = uuid;
             this.extraMetadata = extraMetadata;
@@ -161,6 +166,7 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
             this.storageBasePath = storageBasePath;
             this.configuration = configuration;
             this.assignmentScheduler = assignmentScheduler;
+            this.managedLedgerOffloadPrefetchRounds = 
managedLedgerOffloadPrefetchRounds;
         }
 
         @Override
@@ -188,13 +194,17 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
                 long needToOffloadFirstEntryNumber = 0;
                 CountDownLatch countDownLatch;
+                //avoid prefetch too much data into memory
+                Semaphore semaphore = new 
Semaphore(managedLedgerOffloadPrefetchRounds);
                 do {
                     long end = Math.min(needToOffloadFirstEntryNumber + 
ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
                     log.debug("read ledger entries. start: {}, end: {}", 
needToOffloadFirstEntryNumber, end);
                     LedgerEntries ledgerEntriesOnce = 
readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    semaphore.acquire();
                     countDownLatch = new CountDownLatch(1);
-                    assignmentScheduler.chooseThread(ledgerId).submit(new 
FileSystemWriter(ledgerEntriesOnce, dataWriter,
-                            countDownLatch, haveOffloadEntryNumber, 
this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
+                    
assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce,
 dataWriter, semaphore,
+                            countDownLatch, haveOffloadEntryNumber, 
this)).addListener(() -> {
+                    }, Executors.newSingleThreadExecutor());
                     needToOffloadFirstEntryNumber = end + 1;
                 } while (needToOffloadFirstEntryNumber - 1 != 
readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
                 countDownLatch.await();
@@ -216,24 +226,50 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
 
     private static class FileSystemWriter implements Runnable {
 
-        private final LedgerEntries ledgerEntriesOnce;
+        private LedgerEntries ledgerEntriesOnce;
 
         private final LongWritable key = new LongWritable();
         private final BytesWritable value = new BytesWritable();
 
-        private final MapFile.Writer dataWriter;
-        private final CountDownLatch countDownLatch;
-        private final AtomicLong haveOffloadEntryNumber;
-        private final LedgerReader ledgerReader;
+        private MapFile.Writer dataWriter;
+        private CountDownLatch countDownLatch;
+        private AtomicLong haveOffloadEntryNumber;
+        private LedgerReader ledgerReader;
+        private Semaphore semaphore;
+        private Recycler.Handle<FileSystemWriter> recyclerHandle;
+
+        private FileSystemWriter(Recycler.Handle<FileSystemWriter> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<FileSystemWriter> RECYCLER = new 
Recycler<FileSystemWriter>() {
+            @Override
+            protected FileSystemWriter 
newObject(Recycler.Handle<FileSystemWriter> handle) {
+                return new FileSystemWriter(handle);
+            }
+        };
+
+        private void recycle() {
+            this.dataWriter = null;
+            this.countDownLatch = null;
+            this.haveOffloadEntryNumber = null;
+            this.ledgerReader = null;
+            this.ledgerEntriesOnce = null;
+            this.semaphore = null;
+            recyclerHandle.recycle(this);
+        }
 
 
-        private FileSystemWriter(LedgerEntries ledgerEntriesOnce, 
MapFile.Writer dataWriter,
-                                 CountDownLatch countDownLatch, AtomicLong 
haveOffloadEntryNumber, LedgerReader ledgerReader) {
-            this.ledgerEntriesOnce = ledgerEntriesOnce;
-            this.dataWriter = dataWriter;
-            this.countDownLatch = countDownLatch;
-            this.haveOffloadEntryNumber = haveOffloadEntryNumber;
-            this.ledgerReader = ledgerReader;
+        public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, 
MapFile.Writer dataWriter, Semaphore semaphore,
+                                              CountDownLatch countDownLatch, 
AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
+            FileSystemWriter writer = RECYCLER.get();
+            writer.ledgerReader = ledgerReader;
+            writer.dataWriter = dataWriter;
+            writer.countDownLatch = countDownLatch;
+            writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
+            writer.ledgerEntriesOnce = ledgerEntriesOnce;
+            writer.semaphore = semaphore;
+            return writer;
         }
 
         @Override
@@ -255,6 +291,9 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 }
             }
             countDownLatch.countDown();
+            ledgerEntriesOnce.close();
+            semaphore.release();
+            this.recycle();
         }
     }
 

Reply via email to