ZanderXu commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1062141662


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -109,8 +138,16 @@ public <T extends BaseRecord> boolean initRecordStorage(
     }
   }
 
+  @VisibleForTesting
+  public void setEnableConcurrent(boolean enableConcurrent) {
+    this.enableConcurrent = enableConcurrent;
+  }
+
   @Override
   public void close() throws Exception {
+    if(executorService != null) {

Review Comment:
   `if (executorService != null) {`



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -63,8 +72,14 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
       RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
   public static final String FEDERATION_STORE_ZK_PARENT_PATH =
       FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size";

Review Comment:
   how about changing the name to `FEDERATION_STORE_ZK_DRIVER_PREFIX + 
"async.max.threads"`?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -63,8 +72,14 @@ public class StateStoreZooKeeperImpl extends 
StateStoreSerializableImpl {
       RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
   public static final String FEDERATION_STORE_ZK_PARENT_PATH =
       FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "client.size";
+  public static final int FEDERATION_STORE_ZK_CLIENT_THREADS_SIZE_DEFAULT = -1;

Review Comment:
   This configuration should be moved to 
`org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys` if you want to 
add some descriptions in hdfs-rbf-default.xml



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -137,34 +174,22 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     String znode = getZNodeForClass(clazz);
     try {
       List<String> children = zkManager.getChildren(znode);
-      for (String child : children) {
-        try {
-          String path = getNodePath(znode, child);
-          Stat stat = new Stat();
-          String data = zkManager.getStringData(path, stat);
-          boolean corrupted = false;
-          if (data == null || data.equals("")) {
-            // All records should have data, otherwise this is corrupted
-            corrupted = true;
-          } else {
-            try {
-              T record = createRecord(data, stat, clazz);
-              ret.add(record);
-            } catch (IOException e) {
-              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
-                  clazz.getSimpleName(), data, e.getMessage());
-              corrupted = true;
-            }
-          }
-
-          if (corrupted) {
-            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
-                child, path);
-            zkManager.delete(path);
+      List<Callable<T>> callables = new ArrayList<>();
+      if (enableConcurrent) {
+        children.forEach(child -> callables.add(() -> getRecord(clazz, znode, 
child)));
+        List<Future<T>> futures = executorService.invokeAll(callables);
+        for (Future<T> future : futures) {
+          if (future.get() != null) {
+            ret.add(future.get());
           }
-        } catch (Exception e) {
-          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
         }
+      } else {
+        children.forEach(child -> {
+          T record = getRecord(clazz, znode, child);
+          if (record != null) {
+            ret.add(record);
+          }
+        });

Review Comment:
   ```
         List<Callable<T>> callables = new ArrayList<>();
         zkManager.getChildren(znode).forEach(c -> callables.add(() -> 
getRecord(clazz, znode, c)));
         if (enableConcurrent) {
           List<Future<T>> futures = executorService.invokeAll(callables);
           for (Future<T> future : futures) {
             if (future.get() != null) {
               ret.add(future.get());
             }
           }
         } else {
           for (Callable<T> callable : callables) {
             T record = callable.call();
             if (record != null) {
               ret.add(record);
             }
           }
         }
   ```



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    System.out.printf("Sync mode total running time is %d ms, "

Review Comment:
   change it to `LOG.info()` or delete it.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml:
##########
@@ -377,6 +377,18 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.store.driver.zk.client.size</name>

Review Comment:
   make this configuration more reasonable.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    System.out.printf("Sync mode total running time is %d ms, "
+            + "and async mode total running time is %d ms",
+        endSync - startSync, endAsync - startAsync);
+    assertTrue((endSync - startSync) > (endAsync - startAsync) * 2);

Review Comment:
   assertTrue((endSync - startSync) > (endAsync - startAsync));



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +255,45 @@ public <T extends BaseRecord> boolean putAll(
     String znode = getZNodeForClass(recordClass);
 
     long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
+    final AtomicBoolean status = new AtomicBoolean(true);
+    if (enableConcurrent) {
+      List<Callable<Void>> callables = new ArrayList<>();
+      records.forEach(record ->
+          callables.add(
+              () -> {
+                String primaryKey = getPrimaryKey(record);
+                String recordZNode = getNodePath(znode, primaryKey);
+                byte[] data = serialize(record);
+                if (!writeNode(recordZNode, data, update, error)) {
+                  status.set(false);
+                }
+                return null;
+              }
+          )
+      );
+      try {
+        executorService.invokeAll(callables);
+      } catch (Exception e) {
+        LOG.error("Write record failed : {}", e.getMessage(), e);
+        throw new IOException(e);
       }
+    } else {
+      records.forEach(record -> {

Review Comment:
   nice suggestion to make the code clearer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to