howzi commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1095265373


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java:
##########
@@ -239,6 +239,18 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final long
       FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1;
 
+  // HDFS Router-based federation State Store ZK DRIVER
+  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
+      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
+  public static final String FEDERATION_STORE_ZK_ASYNC_MAX_THREADS =
+      FEDERATION_STORE_ZK_DRIVER_PREFIX + "async.max.threads";
+  public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
+      -1;
+  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
+      "/hdfs-federation";

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testInsert(stateStoreDriver);
   }
 
   @Test
   public void testUpdate()
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
-    testPut(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testPut(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testPut(stateStoreDriver);
   }
 
   @Test
   public void testDelete()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testRemove(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testRemove(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testRemove(stateStoreDriver);
   }
 
   @Test
   public void testFetchErrors()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testFetchErrors(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testFetchErrors(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testInsert(stateStoreDriver);
   }
 
   @Test
   public void testUpdate()
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
-    testPut(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testPut(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +131,75 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    assertTrue((endSync - startSync) > (endAsync - startAsync));
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    // test async mode
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    // test async mode

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to