This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 8e173851410 HDFS-17009. RBF: state store putAll should also return failed records (#5664) 8e173851410 is described below commit 8e173851410a529ed7d85600d17fc1f6580029df Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed May 17 09:33:34 2023 -0700 HDFS-17009. RBF: state store putAll should also return failed records (#5664) --- .../store/driver/StateStoreOperationResult.java | 79 ++++++++++++++++++++++ .../store/driver/StateStoreRecordOperations.java | 5 +- .../store/driver/impl/StateStoreBaseImpl.java | 2 +- .../store/driver/impl/StateStoreFileBaseImpl.java | 27 +++++--- .../store/driver/impl/StateStoreMySQLImpl.java | 12 ++-- .../store/driver/impl/StateStoreZooKeeperImpl.java | 10 ++- .../federation/store/impl/MountTableStoreImpl.java | 2 +- .../store/FederationStateStoreTestUtils.java | 4 +- .../store/driver/TestStateStoreDriverBase.java | 23 ++++++- .../store/records/MockStateStoreDriver.java | 11 +-- 10 files changed, 143 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java new file mode 100644 index 00000000000..02e54bfced2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * State store operation result with list of failed records. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class StateStoreOperationResult { + + private final List<String> failedRecordsKeys; + private final boolean isOperationSuccessful; + + private static final StateStoreOperationResult DEFAULT_OPERATION_SUCCESS_RESULT = + new StateStoreOperationResult(Collections.emptyList(), true); + + /** + * State store operation result constructor with list of failed records keys and boolean + * to inform whether the overall operation is successful. + * + * @param failedRecordsKeys The list of failed records keys. + * @param isOperationSuccessful True if the operation was successful, False otherwise. + */ + public StateStoreOperationResult(List<String> failedRecordsKeys, + boolean isOperationSuccessful) { + this.failedRecordsKeys = failedRecordsKeys; + this.isOperationSuccessful = isOperationSuccessful; + } + + /** + * State store operation result constructor with a single failed record key. + * + * @param failedRecordKey The failed record key. + */ + public StateStoreOperationResult(String failedRecordKey) { + if (failedRecordKey != null && failedRecordKey.length() > 0) { + this.isOperationSuccessful = false; + this.failedRecordsKeys = Collections.singletonList(failedRecordKey); + } else { + this.isOperationSuccessful = true; + this.failedRecordsKeys = Collections.emptyList(); + } + } + + public List<String> getFailedRecordsKeys() { + return failedRecordsKeys; + } + + public boolean isOperationSuccessful() { + return isOperationSuccessful; + } + + public static StateStoreOperationResult getDefaultSuccessResult() { + return DEFAULT_OPERATION_SUCCESS_RESULT; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 04929d5fcc1..716f41daf4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -107,12 +107,11 @@ public interface StateStoreRecordOperations { * @param allowUpdate True if update of exiting record is allowed. * @param errorIfExists True if an error should be returned when inserting * an existing record. Only used if allowUpdate = false. - * @return true if all operations were successful. - * + * @return The result of the putAll operation. * @throws IOException Throws exception if unable to query the data store. */ @AtMostOnce - <T extends BaseRecord> boolean putAll( + <T extends BaseRecord> StateStoreOperationResult putAll( List<T> records, boolean allowUpdate, boolean errorIfExists) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index f7a6174226e..df3ce21dee2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -75,7 +75,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { T record, boolean allowUpdate, boolean errorIfExists) throws IOException { List<T> singletonList = new ArrayList<>(); singletonList.add(record); - return putAll(singletonList, allowUpdate, errorIfExists); + return putAll(singletonList, allowUpdate, errorIfExists).isOperationSuccessful(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java index 4f8feee6093..a0f6fba9bac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; @@ -372,12 +373,12 @@ public abstract class StateStoreFileBaseImpl } @Override - public <T extends BaseRecord> boolean putAll( + public <T extends BaseRecord> StateStoreOperationResult putAll( List<T> records, boolean allowUpdate, boolean errorIfExists) throws StateStoreUnavailableException { verifyDriverReady(); if (records.isEmpty()) { - return true; + return StateStoreOperationResult.getDefaultSuccessResult(); } long start = monotonicNow(); @@ -402,7 +403,7 @@ public abstract class StateStoreFileBaseImpl if (metrics != null) { metrics.addFailure(monotonicNow() - start); } - return false; + return new StateStoreOperationResult(primaryKey); } else { LOG.debug("Not updating {}", record); } @@ -414,7 +415,9 @@ public abstract class StateStoreFileBaseImpl // Write the records final AtomicBoolean success = new AtomicBoolean(true); final List<Callable<Void>> callables = new ArrayList<>(); - toWrite.entrySet().forEach(entry -> callables.add(() -> writeRecordToFile(success, entry))); + final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>()); + toWrite.entrySet().forEach( + entry -> callables.add(() -> writeRecordToFile(success, entry, failedRecordsKeys))); if (this.concurrentStoreAccessPool != null) { // Write records concurrently List<Future<Void>> futures = null; @@ -454,36 +457,40 @@ public abstract class StateStoreFileBaseImpl metrics.addFailure(end - start); } } - return success.get(); + return new StateStoreOperationResult(failedRecordsKeys, success.get()); } /** * Writes the state store record to the file. At first, the record is written to a temp location * and then later renamed to the final location that is passed with the entry key. * + * @param <T> Record class of the records. * @param success The atomic boolean that gets updated to false if the file write operation fails. * @param entry The entry of the record path and the state store record to be written to the file * by first writing to a temp location and then renaming it to the record path. - * @param <T> Record class of the records. + * @param failedRecordsList The list of paths of the failed records. * @return Void. */ private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success, - Entry<String, T> entry) { - String recordPath = entry.getKey(); - String recordPathTemp = recordPath + "." + now() + TMP_MARK; + Entry<String, T> entry, List<String> failedRecordsList) { + final String recordPath = entry.getKey(); + final T record = entry.getValue(); + final String primaryKey = getPrimaryKey(record); + final String recordPathTemp = recordPath + "." + now() + TMP_MARK; boolean recordWrittenSuccessfully = true; try (BufferedWriter writer = getWriter(recordPathTemp)) { - T record = entry.getValue(); String line = serializeString(record); writer.write(line); } catch (IOException e) { LOG.error("Cannot write {}", recordPathTemp, e); recordWrittenSuccessfully = false; + failedRecordsList.add(primaryKey); success.set(false); } // Commit if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) { LOG.error("Failed committing record into {}", recordPath); + failedRecordsList.add(primaryKey); success.set(false); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java index 9b32c883f54..bbeee8e40f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; @@ -161,10 +162,10 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { } @Override - public <T extends BaseRecord> boolean putAll( + public <T extends BaseRecord> StateStoreOperationResult putAll( List<T> records, boolean allowUpdate, boolean errorIfExists) throws IOException { if (records.isEmpty()) { - return true; + return StateStoreOperationResult.getDefaultSuccessResult(); } verifyDriverReady(); @@ -173,6 +174,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { long start = Time.monotonicNow(); boolean success = true; + final List<String> failedRecordsKeys = new ArrayList<>(); for (T record : records) { String tableName = getAndValidateTableNameForClass(record.getClass()); String primaryKey = getPrimaryKey(record); @@ -185,6 +187,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { record.setDateModified(this.getTime()); if (!updateRecord(tableName, primaryKey, data)) { LOG.error("Cannot write {} into table {}", primaryKey, tableName); + failedRecordsKeys.add(primaryKey); success = false; } } else { @@ -194,7 +197,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { if (metrics != null) { metrics.addFailure(Time.monotonicNow() - start); } - return false; + return new StateStoreOperationResult(primaryKey); } else { LOG.debug("Not updating {} as updates are not allowed", record); } @@ -202,6 +205,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { } else { if (!insertRecord(tableName, primaryKey, data)) { LOG.error("Cannot write {} in table {}", primaryKey, tableName); + failedRecordsKeys.add(primaryKey); success = false; } } @@ -215,7 +219,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl { metrics.addFailure(end - start); } } - return success; + return new StateStoreOperationResult(failedRecordsKeys, success); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index 7882c8f8273..18d3e1a11d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -40,6 +41,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; @@ -230,11 +232,11 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } @Override - public <T extends BaseRecord> boolean putAll( + public <T extends BaseRecord> StateStoreOperationResult putAll( List<T> records, boolean update, boolean error) throws IOException { verifyDriverReady(); if (records.isEmpty()) { - return true; + return StateStoreOperationResult.getDefaultSuccessResult(); } // All records should be the same @@ -245,6 +247,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { long start = monotonicNow(); final AtomicBoolean status = new AtomicBoolean(true); List<Callable<Void>> callables = new ArrayList<>(); + final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>()); records.forEach(record -> callables.add( () -> { @@ -252,6 +255,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { String recordZNode = getNodePath(znode, primaryKey); byte[] data = serialize(record); if (!writeNode(recordZNode, data, update, error)) { + failedRecordsKeys.add(primaryKey); status.set(false); } return null; @@ -276,7 +280,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } else { getMetrics().addFailure(end - start); } - return status.get(); + return new StateStoreOperationResult(failedRecordsKeys, status.get()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java index b6428f7923b..b2a608ce933 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -145,7 +145,7 @@ public class MountTableStoreImpl extends MountTableStore { final String src = mountTable.getSourcePath(); checkMountTablePermission(src); } - boolean status = getDriver().putAll(mountTables, false, true); + boolean status = getDriver().putAll(mountTables, false, true).isOperationSuccessful(); AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance(); response.setStatus(status); if (status) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java index 50840460a39..789458100ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -235,9 +235,7 @@ public final class FederationStateStoreTestUtils { StateStoreDriver driver = stateStore.getDriver(); driver.verifyDriverReady(); if (driver.removeAll(clazz)) { - if (driver.putAll(records, true, false)) { - return true; - } + return driver.putAll(records, true, false).isOperationSuccessful(); } return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 73d0774ace3..8b734305a20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -307,7 +307,15 @@ public class TestStateStoreDriverBase { } // Verify - assertTrue(driver.putAll(insertList, false, true)); + StateStoreOperationResult result1 = driver.putAll(insertList, false, true); + assertTrue(result1.isOperationSuccessful()); + assertEquals(0, result1.getFailedRecordsKeys().size()); + + StateStoreOperationResult result2 = driver.putAll(insertList.subList(0, 1), false, true); + assertFalse(result2.isOperationSuccessful()); + assertEquals(1, result2.getFailedRecordsKeys().size()); + assertEquals(getPrimaryKey(insertList.get(0)), result2.getFailedRecordsKeys().get(0)); + records = driver.get(clazz); assertEquals(records.getRecords().size(), 10); @@ -384,7 +392,10 @@ public class TestStateStoreDriverBase { } // Verify - assertTrue(driver.putAll(insertList, false, true)); + StateStoreOperationResult result = driver.putAll(insertList, false, true); + assertTrue(result.isOperationSuccessful()); + assertEquals(0, result.getFailedRecordsKeys().size()); + records = driver.get(clazz); assertEquals(records.getRecords().size(), 10); @@ -689,4 +700,12 @@ public class TestStateStoreDriverBase { } return null; } + + private static String getPrimaryKey(BaseRecord record) { + String primaryKey = record.getPrimaryKey(); + primaryKey = primaryKey.replaceAll("/", "0SLASH0"); + primaryKey = primaryKey.replaceAll(":", "_"); + return primaryKey; + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java index 9f600cb6f3f..d0821a1711b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -89,10 +91,9 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { } @Override - public <T extends BaseRecord> boolean putAll(List<T> records, - boolean allowUpdate, - boolean errorIfExists) - throws IOException { + public <T extends BaseRecord> StateStoreOperationResult putAll(List<T> records, + boolean allowUpdate, + boolean errorIfExists) throws IOException { checkErrors(); for (T record : records) { Map<String, BaseRecord> map = @@ -107,7 +108,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { + ": " + key); } } - return true; + return new StateStoreOperationResult(Collections.emptyList(), true); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org