[
https://issues.apache.org/jira/browse/HDFS-16848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637527#comment-17637527
]
ASF GitHub Bot commented on HDFS-16848:
---------------------------------------
ZanderXu commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1029955572
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -84,6 +102,20 @@ public boolean initDriver() {
baseZNode = conf.get(
FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+ this.enableConcurrent = conf.getBoolean(
+ FEDERATION_STORE_ZK_CLIENT_CONCURRENT,
+ FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT
+ );
+ if(enableConcurrent) {
Review Comment:
`if (enableConcurrent)`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
- for (String child : children) {
- try {
- String path = getNodePath(znode, child);
- Stat stat = new Stat();
- String data = zkManager.getStringData(path, stat);
- boolean corrupted = false;
- if (data == null || data.equals("")) {
- // All records should have data, otherwise this is corrupted
- corrupted = true;
- } else {
- try {
- T record = createRecord(data, stat, clazz);
- ret.add(record);
- } catch (IOException e) {
- LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
- clazz.getSimpleName(), data, e.getMessage());
- corrupted = true;
- }
- }
-
- if (corrupted) {
- LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
- child, path);
- zkManager.delete(path);
+ List<Callable<T>> callables = new ArrayList<>();
+ if(enableConcurrent) {
+ children.forEach(child ->
+ callables.add(
+ () -> getRecord(clazz, znode, child)
+ )
+ );
+ List<Future<T>> futures = executorService.invokeAll(callables);
+ for (Future<T> future : futures) {
+ if (future.get() != null) {
+ ret.add(future.get());
}
- } catch (Exception e) {
- LOG.error("Cannot get data for {}: {}", child, e.getMessage());
}
+ } else {
+ children.forEach(child -> {
+ T record = getRecord(clazz, znode, child);
+ if(record != null) {
Review Comment:
`if (record != null) {`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -63,8 +72,17 @@ public class StateStoreZooKeeperImpl extends
StateStoreSerializableImpl {
RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
public static final String FEDERATION_STORE_ZK_PARENT_PATH =
FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+ public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE =
+ FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size";
+ public static final int FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT = 10;
+ public static final String FEDERATION_STORE_ZK_CLIENT_CONCURRENT =
+ FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.concurrent";
+ public static final boolean FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT =
false;
Review Comment:
Can you use one configuration to control whether use this async mode or not?
such as the number of thread, default value is -1 or 0, and use can enable this
feature by configuring this value with a positive integer.
And we should add detailed explanation of this configuration in
hdfs-rbf-default.xml
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -84,6 +102,20 @@ public boolean initDriver() {
baseZNode = conf.get(
FEDERATION_STORE_ZK_PARENT_PATH,
FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
+ this.enableConcurrent = conf.getBoolean(
+ FEDERATION_STORE_ZK_CLIENT_CONCURRENT,
+ FEDERATION_STORE_ZK_CLIENT_CONCURRENT_DEFAULT
+ );
+ if(enableConcurrent) {
+ int numThreads = conf.getInt(
+ FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE,
+ FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
Review Comment:
check the invalidation of this `numThreads`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
- for (String child : children) {
- try {
- String path = getNodePath(znode, child);
- Stat stat = new Stat();
- String data = zkManager.getStringData(path, stat);
- boolean corrupted = false;
- if (data == null || data.equals("")) {
- // All records should have data, otherwise this is corrupted
- corrupted = true;
- } else {
- try {
- T record = createRecord(data, stat, clazz);
- ret.add(record);
- } catch (IOException e) {
- LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
- clazz.getSimpleName(), data, e.getMessage());
- corrupted = true;
- }
- }
-
- if (corrupted) {
- LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
- child, path);
- zkManager.delete(path);
+ List<Callable<T>> callables = new ArrayList<>();
+ if(enableConcurrent) {
+ children.forEach(child ->
+ callables.add(
+ () -> getRecord(clazz, znode, child)
+ )
+ );
Review Comment:
single line.
`children.forEach(child -> callables.add(() -> getRecord(clazz, znode,
child)));`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -178,6 +210,45 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
return new QueryResult<T>(ret, getTime());
}
+ /**
+ * Get one data record in the StateStore or delete it if it's corrupted.
+ *
+ * @param clazz Record class to evaluate.
+ * @param znode The ZNode for the class.
+ * @param child The child for znode to get.
+ * @return The record to get.
+ */
+ private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode,
String child) {
+ T record = null;
+ try {
+ String path = getNodePath(znode, child);
+ Stat stat = new Stat();
+ String data = zkManager.getStringData(path, stat);
+ boolean corrupted = false;
+ if (data == null || data.equals("")) {
+ // All records should have data, otherwise this is corrupted
+ corrupted = true;
+ } else {
+ try {
+ record = createRecord(data, stat, clazz);
+ } catch (IOException e) {
+ LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
+ clazz.getSimpleName(), data, e.getMessage());
+ corrupted = true;
+ }
+ }
+
+ if (corrupted) {
+ LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
+ child, path);
Review Comment:
single line.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
String znode = getZNodeForClass(recordClass);
long start = monotonicNow();
- boolean status = true;
- for (T record : records) {
- String primaryKey = getPrimaryKey(record);
- String recordZNode = getNodePath(znode, primaryKey);
- byte[] data = serialize(record);
- if (!writeNode(recordZNode, data, update, error)){
- status = false;
+ final AtomicBoolean status = new AtomicBoolean(true);
+ if(enableConcurrent){
+ List<Callable<Void>> callables = new ArrayList<>();
+ records.forEach(record ->
+ callables.add(
+ () -> {
+ String primaryKey = getPrimaryKey(record);
+ String recordZNode = getNodePath(znode, primaryKey);
+ byte[] data = serialize(record);
+ if(!writeNode(recordZNode, data, update, error)) {
Review Comment:
`if (!writeNode(recordZNode, data, update, error))`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
String znode = getZNodeForClass(recordClass);
long start = monotonicNow();
- boolean status = true;
- for (T record : records) {
- String primaryKey = getPrimaryKey(record);
- String recordZNode = getNodePath(znode, primaryKey);
- byte[] data = serialize(record);
- if (!writeNode(recordZNode, data, update, error)){
- status = false;
+ final AtomicBoolean status = new AtomicBoolean(true);
+ if(enableConcurrent){
+ List<Callable<Void>> callables = new ArrayList<>();
+ records.forEach(record ->
+ callables.add(
+ () -> {
+ String primaryKey = getPrimaryKey(record);
+ String recordZNode = getNodePath(znode, primaryKey);
+ byte[] data = serialize(record);
+ if(!writeNode(recordZNode, data, update, error)) {
+ status.set(false);
+ }
+ return null;
+ }
+ )
+ );
+ try {
+ executorService.invokeAll(callables);
+ } catch (Exception e) {
+ LOG.error("Write record failed : {}", e.getMessage(), e);
+ throw new IOException(e);
}
+ } else {
+ records.forEach(record -> {
+ String primaryKey = getPrimaryKey(record);
+ String recordZNode = getNodePath(znode, primaryKey);
+ byte[] data = serialize(record);
+ if(!writeNode(recordZNode, data, update, error)) {
Review Comment:
`if (!writeNode(recordZNode, data, update, error)) {`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +263,45 @@ public <T extends BaseRecord> boolean putAll(
String znode = getZNodeForClass(recordClass);
long start = monotonicNow();
- boolean status = true;
- for (T record : records) {
- String primaryKey = getPrimaryKey(record);
- String recordZNode = getNodePath(znode, primaryKey);
- byte[] data = serialize(record);
- if (!writeNode(recordZNode, data, update, error)){
- status = false;
+ final AtomicBoolean status = new AtomicBoolean(true);
+ if(enableConcurrent){
Review Comment:
` if (enableConcurrent) {`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +177,26 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T>
clazz)
String znode = getZNodeForClass(clazz);
try {
List<String> children = zkManager.getChildren(znode);
- for (String child : children) {
- try {
- String path = getNodePath(znode, child);
- Stat stat = new Stat();
- String data = zkManager.getStringData(path, stat);
- boolean corrupted = false;
- if (data == null || data.equals("")) {
- // All records should have data, otherwise this is corrupted
- corrupted = true;
- } else {
- try {
- T record = createRecord(data, stat, clazz);
- ret.add(record);
- } catch (IOException e) {
- LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
- clazz.getSimpleName(), data, e.getMessage());
- corrupted = true;
- }
- }
-
- if (corrupted) {
- LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
- child, path);
- zkManager.delete(path);
+ List<Callable<T>> callables = new ArrayList<>();
+ if(enableConcurrent) {
Review Comment:
`if( enableConcurrent) {`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +133,71 @@ private <T extends BaseRecord> void testGetNullRecord(
assertNull(curatorFramework.checkExists().forPath(znode));
}
+ @Test
+ public void testAsyncPerformance() throws Exception {
+ StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl)
getStateStoreDriver();
+ List<MountTable> insertList = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ MountTable newRecord = generateFakeRecord(MountTable.class);
+ insertList.add(newRecord);
+ }
+ // Insert Multiple on sync mode
+ long startSync = Time.now();
+ stateStoreDriver.putAll(insertList, true, false);
+ long endSync = Time.now();
+ stateStoreDriver.removeAll(MembershipState.class);
+
+ stateStoreDriver.setEnableConcurrent(true);
+ // Insert Multiple on async mode
+ long startAsync = Time.now();
+ stateStoreDriver.putAll(insertList, true, false);
+ long endAsync = Time.now();
+ System.out.printf("Sync mode total running time is %d ms, and async mode
total running time is %d ms",
Review Comment:
This UT is invalid, can you change it to `assertXXX` ?
> RBF: Improve StateStoreZookeeperImpl
> -------------------------------------
>
> Key: HDFS-16848
> URL: https://issues.apache.org/jira/browse/HDFS-16848
> Project: Hadoop HDFS
> Issue Type: Improvement
> Components: rbf
> Reporter: Sun Hao
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
>
> Currently, router is getting/updating state from zk sequentially. It will
> slowdown router load/update state cache especially for a large cluster or a
> multi region cluster.
> We propose adding a threadpool to deal with zk state synchronization。
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]