goiri commented on code in PR #5147: URL: https://github.com/apache/hadoop/pull/5147#discussion_r1061664359
########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java: ########## @@ -192,22 +255,45 @@ public <T extends BaseRecord> boolean putAll( String znode = getZNodeForClass(recordClass); long start = monotonicNow(); - boolean status = true; - for (T record : records) { - String primaryKey = getPrimaryKey(record); - String recordZNode = getNodePath(znode, primaryKey); - byte[] data = serialize(record); - if (!writeNode(recordZNode, data, update, error)){ - status = false; + final AtomicBoolean status = new AtomicBoolean(true); + if (enableConcurrent) { + List<Callable<Void>> callables = new ArrayList<>(); + records.forEach(record -> + callables.add( + () -> { + String primaryKey = getPrimaryKey(record); + String recordZNode = getNodePath(znode, primaryKey); + byte[] data = serialize(record); + if (!writeNode(recordZNode, data, update, error)) { + status.set(false); + } + return null; + } + ) + ); + try { + executorService.invokeAll(callables); + } catch (Exception e) { + LOG.error("Write record failed : {}", e.getMessage(), e); + throw new IOException(e); } + } else { + records.forEach(record -> { Review Comment: Could we just invoke the callables as in the concurrent case but serially? In that way we would have a single piece of code to create the callables and then we do the if to invoke it concurrent or serial. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java: ########## @@ -126,33 +133,73 @@ private <T extends BaseRecord> void testGetNullRecord( assertNull(curatorFramework.checkExists().forPath(znode)); } + @Test + public void testAsyncPerformance() throws Exception { + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + List<MountTable> insertList = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MountTable newRecord = generateFakeRecord(MountTable.class); + insertList.add(newRecord); + } + // Insert Multiple on sync mode + long startSync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endSync = Time.now(); + stateStoreDriver.removeAll(MembershipState.class); + + stateStoreDriver.setEnableConcurrent(true); + // Insert Multiple on async mode + long startAsync = Time.now(); + stateStoreDriver.putAll(insertList, true, false); + long endAsync = Time.now(); + System.out.printf("Sync mode total running time is %d ms, " + + "and async mode total running time is %d ms", + endSync - startSync, endAsync - startAsync); + assertTrue((endSync - startSync) > (endAsync - startAsync) * 2); + } + @Test public void testGetNullRecord() throws Exception { - testGetNullRecord(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testGetNullRecord(stateStoreDriver); + stateStoreDriver.setEnableConcurrent(true); + testGetNullRecord(stateStoreDriver); } @Test public void testInsert() throws IllegalArgumentException, IllegalAccessException, IOException { - testInsert(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testInsert(stateStoreDriver); + stateStoreDriver.setEnableConcurrent(true); + testInsert(stateStoreDriver); } @Test public void testUpdate() throws IllegalArgumentException, ReflectiveOperationException, IOException, SecurityException { - testPut(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testPut(stateStoreDriver); + stateStoreDriver.setEnableConcurrent(true); + testPut(stateStoreDriver); } @Test public void testDelete() throws IllegalArgumentException, IllegalAccessException, IOException { - testRemove(getStateStoreDriver()); + StateStoreZooKeeperImpl stateStoreDriver = (StateStoreZooKeeperImpl) getStateStoreDriver(); + testRemove(stateStoreDriver); + stateStoreDriver.setEnableConcurrent(true); Review Comment: Add a break line to split the concurrent from the other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org