This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 1b7861b063 Made tablet refresh thread pool size configurable (#4405)
1b7861b063 is described below
commit 1b7861b0633d93e3f4dc0a7c8a177dfce296ed9b
Author: Dave Marion <[email protected]>
AuthorDate: Fri Mar 22 08:34:49 2024 -0400
Made tablet refresh thread pool size configurable (#4405)
---
.../java/org/apache/accumulo/core/conf/Property.java | 12 ++++++++++++
.../java/org/apache/accumulo/manager/Manager.java | 12 ++++++++++++
.../manager/tableOps/bulkVer2/RefreshTablets.java | 9 ++-------
.../manager/tableOps/bulkVer2/TabletRefresher.java | 19 ++++++-------------
.../manager/tableOps/compact/CompactionDriver.java | 3 +--
.../manager/tableOps/compact/RefreshTablets.java | 3 +--
6 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a3057f9d36..d5a9504f28 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -367,6 +367,18 @@ public enum Property {
"Maximum number of threads the TabletGroupWatcher will use in its
BatchScanner to"
+ " look for tablets that need maintenance.",
"4.0.0"),
+ MANAGER_TABLET_REFRESH_MINTHREADS("manager.tablet.refresh.threads.mininum",
"10",
+ PropertyType.COUNT,
+ "The Manager will notify TabletServers that a Tablet needs to be
refreshed after certain operations"
+ + " are performed (e.g. Bulk Import). This property specifies the
number of core threads in a"
+ + " ThreadPool in the Manager that will be used to request these
refresh operations.",
+ "4.0.0"),
+ MANAGER_TABLET_REFRESH_MAXTHREADS("manager.tablet.refresh.threads.maximum",
"10",
+ PropertyType.COUNT,
+ "The Manager will notify TabletServers that a Tablet needs to be
refreshed after certain operations"
+ + " are performed (e.g. Bulk Import). This property specifies the
maximum number of threads in a"
+ + " ThreadPool in the Manager that will be used to request these
refresh operations.",
+ "4.0.0"),
MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
"The time to wait for a tablet server to process a bulk import
request.", "1.4.3"),
MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20",
PropertyType.COUNT,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 174edd3dcf..d17f5f570c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -236,6 +237,7 @@ public class Manager extends AbstractServer
private final long timeToCacheRecoveryWalExistence;
private ExecutorService tableInformationStatusPool = null;
+ private ThreadPoolExecutor tabletRefreshThreadPool;
private final TabletStateStore rootTabletStore;
private final TabletStateStore metadataTabletStore;
@@ -436,6 +438,10 @@ public class Manager extends AbstractServer
return getContext().getTableManager();
}
+ public ThreadPoolExecutor getTabletRefreshThreadPool() {
+ return tabletRefreshThreadPool;
+ }
+
public static void main(String[] args) throws Exception {
try (Manager manager = new Manager(new ConfigOpts(), args)) {
manager.runServer();
@@ -991,6 +997,11 @@ public class Manager extends AbstractServer
tableInformationStatusPool = ThreadPools.getServerThreadPools()
.createExecutorService(getConfiguration(),
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
+ tabletRefreshThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
+
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
+
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
+ .build();
+
Thread statusThread = Threads.createThread("Status Thread", new
StatusThread());
statusThread.start();
@@ -1155,6 +1166,7 @@ public class Manager extends AbstractServer
}
tableInformationStatusPool.shutdownNow();
+ tabletRefreshThreadPool.shutdownNow();
compactionCoordinator.shutdown();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
index 95816691a6..d166eda3c3 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
@@ -22,8 +22,6 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This Repo asks hosted tablets that were bulk loaded into to refresh their
metadata. It works by
@@ -34,8 +32,6 @@ import org.slf4j.LoggerFactory;
*/
public class RefreshTablets extends ManagerRepo {
- private static final Logger log =
LoggerFactory.getLogger(RefreshTablets.class);
-
private static final long serialVersionUID = 1L;
private final BulkInfo bulkInfo;
@@ -52,9 +48,8 @@ public class RefreshTablets extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- TabletRefresher.refresh(manager.getContext(),
manager::onlineTabletServers, fateId,
- bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit,
- tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId));
+ TabletRefresher.refresh(manager, fateId, bulkInfo.tableId,
bulkInfo.firstSplit,
+ bulkInfo.lastSplit, tabletMetadata ->
tabletMetadata.getLoaded().containsValue(fateId));
return new CleanUpBulkImport(bulkInfo);
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index a5cdbe847f..a3d341a12b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -46,6 +45,7 @@ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Retry;
+import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -57,15 +57,10 @@ public class TabletRefresher {
private static final Logger log =
LoggerFactory.getLogger(TabletRefresher.class);
- public static void refresh(ServerContext context,
- Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId,
TableId tableId,
- byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) {
+ public static void refresh(Manager manager, FateId fateId, TableId tableId,
byte[] startRow,
+ byte[] endRow, Predicate<TabletMetadata> needsRefresh) {
- // ELASTICITY_TODO should this thread pool be configurable?
- ThreadPoolExecutor threadPool =
- context.threadPools().getPoolBuilder("Tablet refresh " +
fateId).numCoreThreads(10).build();
-
- try (var tablets = context.getAmple().readTablets().forTable(tableId)
+ try (var tablets =
manager.getContext().getAmple().readTablets().forTable(tableId)
.overlapping(startRow, endRow).checkConsistency()
.fetch(ColumnType.LOADED, ColumnType.LOCATION,
ColumnType.PREV_ROW).build()) {
@@ -84,12 +79,10 @@ public class TabletRefresher {
var refreshesNeeded =
batch.stream().collect(groupingBy(TabletMetadata::getLocation,
mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(),
toList())));
- refreshTablets(threadPool, fateId.canonical(), context,
onlineTserversSupplier,
- refreshesNeeded);
+ refreshTablets(manager.getTabletRefreshThreadPool(),
fateId.canonical(),
+ manager.getContext(), () -> manager.onlineTabletServers(),
refreshesNeeded);
});
- } finally {
- threadPool.shutdownNow();
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 6167ca05cf..0f224736a0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -329,8 +329,7 @@ class CompactionDriver extends ManagerRepo {
// For any compactions that may have happened before this operation
failed, attempt to refresh
// tablets.
- TabletRefresher.refresh(env.getContext(), env::onlineTabletServers,
fateId, tableId, startRow,
- endRow, tabletMetadata -> true);
+ TabletRefresher.refresh(env, fateId, tableId, startRow, endRow,
tabletMetadata -> true);
}
/**
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
index fd4daf0c4c..f7dc869c9e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
@@ -45,8 +45,7 @@ public class RefreshTablets extends ManagerRepo {
@Override
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
- TabletRefresher.refresh(manager.getContext(),
manager::onlineTabletServers, fateId, tableId,
- startRow, endRow, tabletMetadata -> true);
+ TabletRefresher.refresh(manager, fateId, tableId, startRow, endRow,
tabletMetadata -> true);
return new CleanUp(tableId, namespaceId, startRow, endRow);
}