goiri commented on code in PR #5147:
URL: https://github.com/apache/hadoop/pull/5147#discussion_r1061664359


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -192,22 +255,45 @@ public <T extends BaseRecord> boolean putAll(
     String znode = getZNodeForClass(recordClass);
 
     long start = monotonicNow();
-    boolean status = true;
-    for (T record : records) {
-      String primaryKey = getPrimaryKey(record);
-      String recordZNode = getNodePath(znode, primaryKey);
-      byte[] data = serialize(record);
-      if (!writeNode(recordZNode, data, update, error)){
-        status = false;
+    final AtomicBoolean status = new AtomicBoolean(true);
+    if (enableConcurrent) {
+      List<Callable<Void>> callables = new ArrayList<>();
+      records.forEach(record ->
+          callables.add(
+              () -> {
+                String primaryKey = getPrimaryKey(record);
+                String recordZNode = getNodePath(znode, primaryKey);
+                byte[] data = serialize(record);
+                if (!writeNode(recordZNode, data, update, error)) {
+                  status.set(false);
+                }
+                return null;
+              }
+          )
+      );
+      try {
+        executorService.invokeAll(callables);
+      } catch (Exception e) {
+        LOG.error("Write record failed : {}", e.getMessage(), e);
+        throw new IOException(e);
       }
+    } else {
+      records.forEach(record -> {

Review Comment:
   Could we just invoke the callables as in the concurrent case but serially?
   In that way we would have a single piece of code to create the callables and 
then we do the if to invoke it concurrent or serial.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java:
##########
@@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord(
     assertNull(curatorFramework.checkExists().forPath(znode));
   }
 
+  @Test
+  public void testAsyncPerformance() throws Exception {
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    List<MountTable> insertList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      MountTable newRecord = generateFakeRecord(MountTable.class);
+      insertList.add(newRecord);
+    }
+    // Insert Multiple on sync mode
+    long startSync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endSync = Time.now();
+    stateStoreDriver.removeAll(MembershipState.class);
+
+    stateStoreDriver.setEnableConcurrent(true);
+    // Insert Multiple on async mode
+    long startAsync = Time.now();
+    stateStoreDriver.putAll(insertList, true, false);
+    long endAsync = Time.now();
+    System.out.printf("Sync mode total running time is %d ms, "
+            + "and async mode total running time is %d ms",
+        endSync - startSync, endAsync - startAsync);
+    assertTrue((endSync - startSync) > (endAsync - startAsync) * 2);
+  }
+
   @Test
   public void testGetNullRecord() throws Exception {
-    testGetNullRecord(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testGetNullRecord(stateStoreDriver);
+    stateStoreDriver.setEnableConcurrent(true);
+    testGetNullRecord(stateStoreDriver);
   }
 
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testInsert(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testInsert(stateStoreDriver);
+    stateStoreDriver.setEnableConcurrent(true);
+    testInsert(stateStoreDriver);
   }
 
   @Test
   public void testUpdate()
       throws IllegalArgumentException, ReflectiveOperationException,
       IOException, SecurityException {
-    testPut(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testPut(stateStoreDriver);
+    stateStoreDriver.setEnableConcurrent(true);
+    testPut(stateStoreDriver);
   }
 
   @Test
   public void testDelete()
       throws IllegalArgumentException, IllegalAccessException, IOException {
-    testRemove(getStateStoreDriver());
+    StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) 
getStateStoreDriver();
+    testRemove(stateStoreDriver);
+    stateStoreDriver.setEnableConcurrent(true);

Review Comment:
   Add a break line to split the concurrent from the other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to