This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 5d0f4dc729 HBASE-27091 Speed up the loading of table descriptor from
filesystem (#4493)
5d0f4dc729 is described below
commit 5d0f4dc729a471c1dd0b926f0d6fabdbca01d7f7
Author: LiangJun He <[email protected]>
AuthorDate: Sat Jun 11 08:48:23 2022 +0800
HBASE-27091 Speed up the loading of table descriptor from filesystem (#4493)
Signed-off-by: Yu Li <[email protected]>
---
.../org/apache/hadoop/hbase/HBaseServerBase.java | 15 +++-
.../org/apache/hadoop/hbase/TableDescriptors.java | 8 ++-
.../org/apache/hadoop/hbase/master/HMaster.java | 1 +
.../hadoop/hbase/regionserver/HRegionServer.java | 1 +
.../hadoop/hbase/util/FSTableDescriptors.java | 80 +++++++++++++++++++---
.../hadoop/hbase/util/TestFSTableDescriptors.java | 31 +++++++++
6 files changed, 124 insertions(+), 12 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
index f39e260914..e148c7da84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java
@@ -231,8 +231,10 @@ public abstract class HBaseServerBase<R extends
HBaseRpcServicesBase<?>> extends
// init the filesystem
this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
+ int tableDescriptorParallelLoadThreads =
+ conf.getInt("hbase.tabledescriptor.parallel.load.threads", 0);
this.tableDescriptors = new FSTableDescriptors(this.dataFs,
this.dataRootDir,
- !canUpdateTableDescriptor(), cacheTableDescriptor());
+ !canUpdateTableDescriptor(), cacheTableDescriptor(),
tableDescriptorParallelLoadThreads);
}
public HBaseServerBase(Configuration conf, String name) throws IOException {
@@ -466,6 +468,17 @@ public abstract class HBaseServerBase<R extends
HBaseRpcServicesBase<?>> extends
}
}
+ protected final void closeTableDescriptors() {
+ if (this.tableDescriptors != null) {
+ LOG.info("Close table descriptors");
+ try {
+ this.tableDescriptors.close();
+ } catch (IOException e) {
+ LOG.debug("Failed to close table descriptors gracefully", e);
+ }
+ }
+ }
+
/**
* In order to register ShutdownHook, this method is called when HMaster and
HRegionServer are
* started. For details, please refer to HBASE-26951
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java
index 1dc17eff0d..9ecdf39679 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/TableDescriptors.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -26,7 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Get, remove and modify table descriptors.
*/
@InterfaceAudience.Private
-public interface TableDescriptors {
+public interface TableDescriptors extends Closeable {
/**
* Test whether a given table exists, i.e, has a table descriptor.
@@ -35,6 +36,11 @@ public interface TableDescriptors {
return get(tableName) != null;
}
+ @Override
+ default void close() throws IOException {
+ // do nothing by default
+ }
+
/**
* @return TableDescriptor for tablename
*/
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index c9556a80ed..2b818d9cc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -601,6 +601,7 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
this.rpcServices.stop();
}
closeZooKeeper();
+ closeTableDescriptors();
span.setStatus(StatusCode.OK);
} finally {
span.end();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0ee4c5d746..1865929dc5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -977,6 +977,7 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
ZNodeClearer.deleteMyEphemeralNodeOnDisk();
closeZooKeeper();
+ closeTableDescriptors();
LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper
connection closed.");
span.setStatus(StatusCode.OK);
} finally {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index 8f9dd4426d..27d66d14ca 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -21,6 +21,7 @@ import com.google.errorprone.annotations.RestrictedApi;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -28,6 +29,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -55,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Implementation of {@link TableDescriptors} that reads descriptors from the
passed filesystem. It
@@ -79,6 +87,8 @@ public class FSTableDescriptors implements TableDescriptors {
private final boolean fsreadonly;
private final boolean usecache;
private volatile boolean fsvisited;
+ private boolean tableDescriptorParallelLoadEnable = false;
+ private ThreadPoolExecutor executor;
long cachehits = 0;
long invocations = 0;
@@ -108,10 +118,23 @@ public class FSTableDescriptors implements
TableDescriptors {
public FSTableDescriptors(final FileSystem fs, final Path rootdir, final
boolean fsreadonly,
final boolean usecache) {
+ this(fs, rootdir, fsreadonly, usecache, 0);
+ }
+
+ public FSTableDescriptors(final FileSystem fs, final Path rootdir, final
boolean fsreadonly,
+ final boolean usecache, final int tableDescriptorParallelLoadThreads) {
this.fs = fs;
this.rootdir = rootdir;
this.fsreadonly = fsreadonly;
this.usecache = usecache;
+ if (tableDescriptorParallelLoadThreads > 0) {
+ tableDescriptorParallelLoadEnable = true;
+ executor = new ThreadPoolExecutor(tableDescriptorParallelLoadThreads,
+ tableDescriptorParallelLoadThreads, 1, TimeUnit.SECONDS, new
LinkedBlockingQueue<>(),
+ new
ThreadFactoryBuilder().setNameFormat("FSTableDescriptorLoad-pool-%d").setDaemon(true)
+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
+ executor.allowCoreThreadTimeOut(true);
+ }
}
public static void tryUpdateMetaTableDescriptor(Configuration conf) throws
IOException {
@@ -235,27 +258,56 @@ public class FSTableDescriptors implements
TableDescriptors {
*/
@Override
public Map<String, TableDescriptor> getAll() throws IOException {
- Map<String, TableDescriptor> tds = new TreeMap<>();
+ Map<String, TableDescriptor> tds = new ConcurrentSkipListMap<>();
if (fsvisited) {
for (Map.Entry<TableName, TableDescriptor> entry :
this.cache.entrySet()) {
tds.put(entry.getKey().getNameWithNamespaceInclAsString(),
entry.getValue());
}
} else {
- LOG.trace("Fetching table descriptors from the filesystem.");
- boolean allvisited = usecache;
- for (Path d : FSUtils.getTableDirs(fs, rootdir)) {
- TableDescriptor htd = get(CommonFSUtils.getTableName(d));
- if (htd == null) {
- allvisited = false;
- } else {
- tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
+ LOG.info("Fetching table descriptors from the filesystem.");
+ final long startTime = EnvironmentEdgeManager.currentTime();
+ AtomicBoolean allvisited = new AtomicBoolean(usecache);
+ List<Path> tableDirs = FSUtils.getTableDirs(fs, rootdir);
+ if (!tableDescriptorParallelLoadEnable) {
+ for (Path dir : tableDirs) {
+ internalGet(dir, tds, allvisited);
+ }
+ } else {
+ CountDownLatch latch = new CountDownLatch(tableDirs.size());
+ for (Path dir : tableDirs) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ internalGet(dir, tds, allvisited);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ throw (InterruptedIOException) new
InterruptedIOException().initCause(ie);
}
}
- fsvisited = allvisited;
+ fsvisited = allvisited.get();
+ LOG.info("Fetched table descriptors(size=" + tds.size() + ") cost "
+ + (EnvironmentEdgeManager.currentTime() - startTime) + "ms.");
}
return tds;
}
+ private void internalGet(Path dir, Map<String, TableDescriptor> tds,
AtomicBoolean allvisited) {
+ TableDescriptor htd = get(CommonFSUtils.getTableName(dir));
+ if (htd == null) {
+ allvisited.set(false);
+ } else {
+ tds.put(htd.getTableName().getNameWithNamespaceInclAsString(), htd);
+ }
+ }
+
/**
* Find descriptors by namespace.
* @see #get(org.apache.hadoop.hbase.TableName)
@@ -379,6 +431,14 @@ public class FSTableDescriptors implements
TableDescriptors {
return Bytes.toString(b);
}
+ @Override
+ public void close() throws IOException {
+ // Close the executor when parallel loading enabled.
+ if (tableDescriptorParallelLoadEnable) {
+ this.executor.shutdown();
+ }
+ }
+
static final class SequenceIdAndFileLength {
final int sequenceId;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java
index 24ca058dc2..5e2b4b5295 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSTableDescriptors.java
@@ -285,6 +285,32 @@ public class TestFSTableDescriptors {
+ htds.getAll().size(), count + 1, htds.getAll().size());
}
+ @Test
+ public void testParallelGetAll() throws IOException, InterruptedException {
+ final String name = "testParallelGetAll";
+ FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+ // Enable parallel load table descriptor.
+ FSTableDescriptors htds = new FSTableDescriptorsTest(fs, testDir, true,
20);
+ final int count = 100;
+ // Write out table infos.
+ for (int i = 0; i < count; i++) {
+ htds.createTableDescriptor(
+ TableDescriptorBuilder.newBuilder(TableName.valueOf(name +
i)).build());
+ }
+ // add hbase:meta
+ htds
+
.createTableDescriptor(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME).build());
+
+ int getTableDescriptorSize = htds.getAll().size();
+ assertEquals("getAll() didn't return all TableDescriptors, expected: " +
(count + 1) + " got: "
+ + getTableDescriptorSize, count + 1, getTableDescriptorSize);
+
+ // get again to check whether the cache works well
+ getTableDescriptorSize = htds.getAll().size();
+ assertEquals("getAll() didn't return all TableDescriptors with cache,
expected: " + (count + 1)
+ + " got: " + getTableDescriptorSize, count + 1, getTableDescriptorSize);
+ }
+
@Test
public void testGetAllOrdering() throws Exception {
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
@@ -467,6 +493,11 @@ public class TestFSTableDescriptors {
super(fs, rootdir, false, usecache);
}
+ public FSTableDescriptorsTest(FileSystem fs, Path rootdir, boolean
usecache,
+ int tableDescriptorParallelLoadThreads) {
+ super(fs, rootdir, false, usecache, tableDescriptorParallelLoadThreads);
+ }
+
@Override
public TableDescriptor get(TableName tablename) {
LOG.info((super.isUsecache() ? "Cached" : "Non-Cached") + "
TableDescriptor.get() on "