This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new d7e5b3a3fd HDDS-10770. [Hsync] Allow overwrite hsynced file (#6603)
d7e5b3a3fd is described below
commit d7e5b3a3fde496bae5666c8adaacfbacf57a67d5
Author: Sammi Chen <[email protected]>
AuthorDate: Thu May 9 08:15:18 2024 +0800
HDDS-10770. [Hsync] Allow overwrite hsynced file (#6603)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 285 ++++++++++++++++++++-
.../java/org/apache/hadoop/ozone/TestDataUtil.java | 42 +++
.../ozone/client/rpc/TestSecureOzoneRpcClient.java | 24 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 5 +-
.../om/request/key/OMAllocateBlockRequest.java | 5 +-
.../request/key/OMAllocateBlockRequestWithFSO.java | 5 +
.../ozone/om/request/key/OMKeyCommitRequest.java | 57 +++--
.../om/request/key/OMKeyCommitRequestWithFSO.java | 59 +++--
.../ozone/om/response/key/OMKeyCommitResponse.java | 15 +-
.../response/key/OMKeyCommitResponseWithFSO.java | 5 +-
.../om/response/key/TestOMKeyCommitResponse.java | 2 +-
.../key/TestOMKeyCommitResponseWithFSO.java | 2 +-
13 files changed, 436 insertions(+), 71 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 3ddbb41008..19c0412363 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -388,6 +388,7 @@ public final class OzoneConsts {
public static final String HSYNC_CLIENT_ID = "hsyncClientId";
public static final String LEASE_RECOVERY = "leaseRecovery";
public static final String DELETED_HSYNC_KEY = "deletedHsyncKey";
+ public static final String OVERWRITTEN_HSYNC_KEY = "overwrittenHsyncKey";
public static final String FORCE_LEASE_RECOVERY_ENV =
"OZONE.CLIENT.RECOVER.LEASE.FORCE";
//GDPR
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index cdc9487a05..13cae7bff9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -28,6 +28,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,7 @@ import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -113,6 +115,8 @@ import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.apache.hadoop.ozone.TestDataUtil.cleanupDeletedTable;
+import static org.apache.hadoop.ozone.TestDataUtil.cleanupOpenKeyTable;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
@@ -437,14 +441,14 @@ public class TestHSync {
os.write(1);
os.hsync();
// There should be 1 key in openFileTable
- assertThat(1 ==
getOpenKeyInfo(BucketLayout.FILE_SYSTEM_OPTIMIZED).size());
+ assertThat(1 == getOpenKeyInfo(BUCKET_LAYOUT).size());
// Delete directory recursively
fs.delete(new Path(OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER +
bucket.getName() + OZONE_URI_DELIMITER + "dir1/"), true);
// Verify if DELETED_HSYNC_KEY metadata is added to openKey
GenericTestUtils.waitFor(() -> {
- List<OmKeyInfo> omKeyInfo =
getOpenKeyInfo(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ List<OmKeyInfo> omKeyInfo = getOpenKeyInfo(BUCKET_LAYOUT);
return omKeyInfo.size() > 0 &&
omKeyInfo.get(0).getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY);
}, 1000, 12000);
@@ -453,7 +457,7 @@ public class TestHSync {
// Verify entry from openKey gets deleted eventually
GenericTestUtils.waitFor(() ->
- 0 == getOpenKeyInfo(BucketLayout.FILE_SYSTEM_OPTIMIZED).size(),
1000, 12000);
+ 0 == getOpenKeyInfo(BUCKET_LAYOUT).size(), 1000, 12000);
} catch (OMException ex) {
assertEquals(OMException.ResultCodes.DIRECTORY_NOT_FOUND,
ex.getResult());
} finally {
@@ -491,7 +495,7 @@ public class TestHSync {
ThreadLocalRandom.current().nextBytes(data);
try (FileSystem fs = FileSystem.get(CONF)) {
- final Path file = new Path(dir, "file");
+ final Path file = new Path(dir, "file-hsync-uncommitted-blocks");
try (FSDataOutputStream outputStream = fs.create(file, true)) {
outputStream.hsync();
outputStream.write(data);
@@ -545,7 +549,7 @@ public class TestHSync {
final byte[] data = new byte[128];
ThreadLocalRandom.current().nextBytes(data);
- final Path file = new Path(dir, "file-hsync-then-close");
+ final Path file = new Path(dir, "file-hsync");
try (FileSystem fs = FileSystem.get(CONF)) {
long fileSize = 0;
try (FSDataOutputStream outputStream = fs.create(file, true)) {
@@ -962,4 +966,275 @@ public class TestHSync {
}
bucket.deleteKey(keyName);
}
+
+ @Test
+ public void testNormalKeyOverwriteHSyncKey() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ // Expect empty OpenKeyTable before key creation
+ OzoneManager ozoneManager = cluster.getOzoneManager();
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ Table<String, OmKeyInfo> openKeyTable =
metadataManager.getOpenKeyTable(BUCKET_LAYOUT);
+ Table<String, RepeatedOmKeyInfo> deletedTable =
metadataManager.getDeletedTable();
+ assertTrue(openKeyTable.isEmpty());
+ assertTrue(deletedTable.isEmpty());
+ ozoneManager.getKeyManager().getDeletingService().suspend();
+ OMMetrics metrics = ozoneManager.getMetrics();
+ metrics.incDataCommittedBytes(-metrics.getDataCommittedBytes());
+ assertEquals(0, metrics.getDataCommittedBytes());
+ OzoneVolume volume =
client.getObjectStore().getVolume(bucket.getVolumeName());
+ OzoneBucket ozoneBucket = volume.getBucket(bucket.getName());
+ long usedBytes = ozoneBucket.getUsedBytes();
+
+ String data1 = "data for normal file";
+ String data2 = "data for hsynced file";
+ final Path file = new Path(dir, "file-normal-overwrite-hsync");
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ // create hsync key
+ FSDataOutputStream outputStream1 = fs.create(file, true);
+ outputStream1.write(data2.getBytes(UTF_8), 0, data2.length());
+ outputStream1.hsync();
+ // write more data
+ String s = RandomStringUtils.randomAlphabetic(BLOCK_SIZE);
+ byte[] newData = s.getBytes(StandardCharsets.UTF_8);
+ outputStream1.write(newData);
+
+ // create normal key and commit
+ FSDataOutputStream outputStream2 = fs.create(file, true);
+ outputStream2.write(data1.getBytes(UTF_8), 0, data1.length());
+ outputStream2.close();
+ assertEquals(data1.length(), metrics.getDataCommittedBytes());
+
+ // hsync call for overwritten hsync key, should fail
+ OMException omException = assertThrows(OMException.class, () ->
outputStream1.hsync());
+ assertTrue(omException.getResult() ==
OMException.ResultCodes.KEY_NOT_FOUND);
+ assertTrue(omException.getMessage().contains("already
deleted/overwritten"));
+
+ // allocate new block for overwritten hsync key, should fail
+ IOException ioException = assertThrows(IOException.class, () ->
outputStream1.write(newData));
+ assertTrue(ioException.getCause() instanceof OMException);
+ assertTrue(((OMException)ioException.getCause()).getResult() ==
OMException.ResultCodes.KEY_NOT_FOUND);
+ assertTrue(ioException.getMessage().contains("already
deleted/overwritten"));
+
+ // recover key will success since key is already committed by
outputStream2
+ ((RootedOzoneFileSystem)fs).recoverLease(file);
+
+ Map<String, OmKeyInfo> openKeys = getAllOpenKeys(openKeyTable);
+ Map<String, RepeatedOmKeyInfo> deletedKeys =
getAllDeletedKeys(deletedTable);
+ // outputStream1's has one openKey left in openKeyTable. It will be
cleaned up by OpenKeyCleanupService later.
+ assertEquals(1, openKeys.size());
+ // outputStream1's has one delete key record in deletedTable
+ assertEquals(1, deletedKeys.size());
+
+ // final file will have data1 content
+ OzoneKeyDetails keyInfo = bucket.getKey(file.getName());
+ try (OzoneInputStream is = bucket.readKey(file.getName())) {
+ ByteBuffer readBuffer = ByteBuffer.allocate((int)
keyInfo.getDataSize());
+ int readLen = is.read(readBuffer);
+ assertEquals(keyInfo.getDataSize(), readLen);
+ assertArrayEquals(data1.getBytes(UTF_8), readBuffer.array());
+ }
+
+ // verify bucket info
+ ozoneBucket = volume.getBucket(bucket.getName());
+ assertEquals(keyInfo.getDataSize() *
keyInfo.getReplicationConfig().getRequiredNodes() + usedBytes,
+ ozoneBucket.getUsedBytes());
+
+ // Resume openKeyCleanupService
+ openKeyCleanupService.resume();
+ // Verify entry from openKey gets deleted eventually
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return getAllOpenKeys(openKeyTable).size() == 0 &&
getAllDeletedKeys(deletedTable).size() == 2;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 100, 5000);
+ } finally {
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ ozoneManager.getKeyManager().getDeletingService().resume();
+ openKeyCleanupService.suspend();
+ }
+ }
+
+ @Test
+ public void testHSyncKeyOverwriteNormalKey() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ // Expect empty OpenKeyTable before key creation
+ OzoneManager ozoneManager = cluster.getOzoneManager();
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ Table<String, OmKeyInfo> openKeyTable =
metadataManager.getOpenKeyTable(BUCKET_LAYOUT);
+ Table<String, RepeatedOmKeyInfo> deletedTable =
metadataManager.getDeletedTable();
+ assertTrue(openKeyTable.isEmpty());
+ assertTrue(deletedTable.isEmpty());
+ ozoneManager.getKeyManager().getDeletingService().suspend();
+ OMMetrics metrics = ozoneManager.getMetrics();
+ metrics.incDataCommittedBytes(-metrics.getDataCommittedBytes());
+ assertEquals(0, metrics.getDataCommittedBytes());
+ OzoneVolume volume =
client.getObjectStore().getVolume(bucket.getVolumeName());
+ OzoneBucket ozoneBucket = volume.getBucket(bucket.getName());
+ long usedBytes = ozoneBucket.getUsedBytes();
+
+ String data1 = "data for normal file";
+ String data2 = "data for hsynced file";
+ final Path file = new Path(dir, "file-hsync-overwrite-normal");
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ // create and commit normal key
+ FSDataOutputStream outputStream1 = fs.create(file, true);
+ outputStream1.write(data1.getBytes(UTF_8), 0, data1.length());
+ outputStream1.close();
+ assertEquals(data1.length(), metrics.getDataCommittedBytes());
+
+ // create hsync key and commit
+ FSDataOutputStream outputStream2 = fs.create(file, true);
+ outputStream2.write(data2.getBytes(UTF_8), 0, data2.length());
+ outputStream2.hsync();
+ outputStream2.close();
+ assertEquals(data1.length() + data2.length(),
metrics.getDataCommittedBytes());
+
+ Map<String, OmKeyInfo> openKeys = getAllOpenKeys(openKeyTable);
+ Map<String, RepeatedOmKeyInfo> deletedKeys =
getAllDeletedKeys(deletedTable);
+ // There should be no key in openKeyTable
+ assertEquals(0, openKeys.size());
+ // There should be one key in delete table
+ assertEquals(1, deletedKeys.size());
+
+ // final file will have data2 content
+ OzoneKeyDetails keyInfo = bucket.getKey(file.getName());
+ try (OzoneInputStream is = bucket.readKey(file.getName())) {
+ ByteBuffer readBuffer = ByteBuffer.allocate((int)
keyInfo.getDataSize());
+ int readLen = is.read(readBuffer);
+ assertEquals(keyInfo.getDataSize(), readLen);
+ assertArrayEquals(data2.getBytes(UTF_8), readBuffer.array());
+ }
+
+ // verify bucket info
+ ozoneBucket = volume.getBucket(bucket.getName());
+ assertEquals(keyInfo.getDataSize() *
keyInfo.getReplicationConfig().getRequiredNodes() + usedBytes,
+ ozoneBucket.getUsedBytes());
+ } finally {
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ ozoneManager.getKeyManager().getDeletingService().resume();
+ }
+ }
+
+ @Test
+ public void testHSyncKeyOverwriteHSyncKey() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+
+ // Expect empty OpenKeyTable before key creation
+ OzoneManager ozoneManager = cluster.getOzoneManager();
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ Table<String, OmKeyInfo> openKeyTable =
metadataManager.getOpenKeyTable(BUCKET_LAYOUT);
+ Table<String, RepeatedOmKeyInfo> deletedTable =
metadataManager.getDeletedTable();
+ assertTrue(openKeyTable.isEmpty());
+ assertTrue(deletedTable.isEmpty());
+ ozoneManager.getKeyManager().getDeletingService().suspend();
+ OMMetrics metrics = ozoneManager.getMetrics();
+ metrics.incDataCommittedBytes(-metrics.getDataCommittedBytes());
+ assertEquals(0, metrics.getDataCommittedBytes());
+ OzoneVolume volume =
client.getObjectStore().getVolume(bucket.getVolumeName());
+ OzoneBucket ozoneBucket = volume.getBucket(bucket.getName());
+ long usedBytes = ozoneBucket.getUsedBytes();
+
+ String data1 = "data for first hsynced file";
+ String data2 = "data for second hsynced file";
+ final Path file = new Path(dir, "file-hsync-overwrite-hsync");
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ // create first hsync key and call hsync
+ FSDataOutputStream outputStream1 = fs.create(file, true);
+ outputStream1.write(data1.getBytes(UTF_8), 0, data1.length());
+ outputStream1.hsync();
+
+ // create second hync key and call hsync
+ FSDataOutputStream outputStream2 = fs.create(file, true);
+ outputStream2.write(data2.getBytes(UTF_8), 0, data2.length());
+ outputStream2.hsync();
+
+ // close first hsync key should fail
+ OMException omException = assertThrows(OMException.class, () ->
outputStream1.close());
+ assertTrue(omException.getResult() ==
OMException.ResultCodes.KEY_NOT_FOUND);
+ assertTrue(omException.getMessage().contains("already
deleted/overwritten"));
+
+ // hsync/close second hsync key should success
+ outputStream2.hsync();
+ outputStream2.close();
+
+ Map<String, OmKeyInfo> openKeys = getAllOpenKeys(openKeyTable);
+ Map<String, RepeatedOmKeyInfo> deletedKeys =
getAllDeletedKeys(deletedTable);
+ // outputStream1's has one openKey left in openKeyTable. It will be
cleaned up by OpenKeyCleanupService later.
+ assertEquals(1, openKeys.size());
+ // outputStream1's has one delete key record in deletedTable
+ assertEquals(1, deletedKeys.size());
+
+ // final file will have data2 content
+ OzoneKeyDetails keyInfo = bucket.getKey(file.getName());
+ try (OzoneInputStream is = bucket.readKey(file.getName())) {
+ ByteBuffer readBuffer = ByteBuffer.allocate((int)
keyInfo.getDataSize());
+ int readLen = is.read(readBuffer);
+ assertEquals(keyInfo.getDataSize(), readLen);
+ assertArrayEquals(data2.getBytes(UTF_8), readBuffer.array());
+ }
+
+ // verify bucket info
+ ozoneBucket = volume.getBucket(bucket.getName());
+ assertEquals(keyInfo.getDataSize() *
keyInfo.getReplicationConfig().getRequiredNodes() + usedBytes,
+ ozoneBucket.getUsedBytes());
+ } finally {
+ cleanupDeletedTable(ozoneManager);
+ cleanupOpenKeyTable(ozoneManager, BUCKET_LAYOUT);
+ ozoneManager.getKeyManager().getDeletingService().resume();
+ }
+ }
+
+ private Map<String, OmKeyInfo> getAllOpenKeys(Table<String, OmKeyInfo>
table) throws IOException {
+ Map<String, OmKeyInfo> keys = new HashMap<String, OmKeyInfo>();
+ try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
tableIter = table.iterator()) {
+ while (tableIter.hasNext()) {
+ Table.KeyValue<String, OmKeyInfo> kv = tableIter.next();
+ String key = kv.getKey();
+ keys.put(key, kv.getValue());
+ }
+ }
+ return keys;
+ }
+
+ private Map<String, RepeatedOmKeyInfo> getAllDeletedKeys(Table<String,
RepeatedOmKeyInfo> table) throws IOException {
+ Map<String, RepeatedOmKeyInfo> keys = new HashMap<String,
RepeatedOmKeyInfo>();
+ try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>> tableIter = table.iterator()) {
+ while (tableIter.hasNext()) {
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = tableIter.next();
+ String key = kv.getKey();
+ keys.put(key, kv.getValue());
+ }
+ }
+ return keys;
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 11be7be716..4488e467c2 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Scanner;
@@ -30,6 +32,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
@@ -38,10 +42,12 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
@@ -200,6 +206,42 @@ public final class TestDataUtil {
return keyLocationMap;
}
+ public static void cleanupDeletedTable(OzoneManager ozoneManager) throws
IOException {
+ Table<String, RepeatedOmKeyInfo> deletedTable =
ozoneManager.getMetadataManager().getDeletedTable();
+ List<String> nameList = new ArrayList<>();
+ try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>> keyIter = deletedTable.iterator()) {
+ while (keyIter.hasNext()) {
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
+ nameList.add(kv.getKey());
+ }
+ }
+ nameList.forEach(k -> {
+ try {
+ deletedTable.delete(k);
+ } catch (IOException e) {
+ // do nothing
+ }
+ });
+ }
+
+ public static void cleanupOpenKeyTable(OzoneManager ozoneManager,
BucketLayout bucketLayout) throws IOException {
+ Table<String, OmKeyInfo> openKeyTable =
ozoneManager.getMetadataManager().getOpenKeyTable(bucketLayout);
+ List<String> nameList = new ArrayList<>();
+ try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = openKeyTable.iterator()) {
+ while (keyIter.hasNext()) {
+ Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
+ nameList.add(kv.getKey());
+ }
+ }
+ nameList.forEach(k -> {
+ try {
+ openKeyTable.delete(k);
+ } catch (IOException e) {
+ // do nothing
+ }
+ });
+ }
+
private static OmKeyInfo lookupOmKeyInfo(MiniOzoneCluster cluster,
OzoneBucket bucket, String key) throws IOException {
OmKeyArgs arg = new OmKeyArgs.Builder()
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index 15af5a2d8e..323859be43 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -81,9 +81,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -93,6 +91,7 @@ import static
org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.TestDataUtil.cleanupDeletedTable;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
import static org.apache.ozone.test.GenericTestUtils.getTestStartTime;
@@ -299,7 +298,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
@ParameterizedTest
@ValueSource(ints = {1 << 24, (1 << 24) + 1, (1 << 24) - 1})
public void testPreallocateFileRecovery(long dataSize) throws Exception {
- cleanupDeletedTable();
+ cleanupDeletedTable(ozoneManager);
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
@@ -370,25 +369,6 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
}
}
- private void cleanupDeletedTable() throws IOException {
- Table<String, RepeatedOmKeyInfo> deletedTable =
ozoneManager.getMetadataManager().getDeletedTable();
- List<String> nameList = new ArrayList<>();
- try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>>
- keyIter = deletedTable.iterator()) {
- while (keyIter.hasNext()) {
- Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
- nameList.add(kv.getKey());
- }
- }
- nameList.forEach(k -> {
- try {
- deletedTable.delete(k);
- } catch (IOException e) {
- // do nothing
- }
- });
- }
-
private void assertTokenIsNull(OmKeyInfo value) {
value.getKeyLocationVersions()
.forEach(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index b5ea6d429a..6b48cea095 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1839,9 +1839,10 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
.isPresent();
if ((!isHsync && openKeyInfo.getCreationTime() <=
expiredCreationTimestamp) ||
-
(openKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY))) {
+
(openKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY)) ||
+
(openKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY))) {
// add non-hsync'ed keys
- // also add hsync keys which are already deleted from keyTable
+ // also add hsync keys which are already deleted/overwritten from
keyTable
expiredKeys.addOpenKey(openKeyInfo, dbOpenKeyName);
num++;
} else if (isHsync && openKeyInfo.getModificationTime() <=
expiredLeaseTimestamp &&
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 2e3b63d6e4..9edb07f1a1 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -208,8 +208,9 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
throw new OMException("Open Key " + openKeyName + " is under lease
recovery",
KEY_UNDER_LEASE_RECOVERY);
}
- if
(openKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY)) {
- throw new OMException("Open Key " + openKeyName + " is already
deleted",
+ if (openKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY)
||
+
openKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) {
+ throw new OMException("Open Key " + openKeyName + " is already
deleted/overwritten",
KEY_NOT_FOUND);
}
List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
index 838f3ff5a1..59748696ac 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
@@ -130,6 +130,11 @@ public class OMAllocateBlockRequestWithFSO extends
OMAllocateBlockRequest {
throw new OMException("Open Key " + openKeyName + " is under lease
recovery",
KEY_UNDER_LEASE_RECOVERY);
}
+ if (openKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY)
||
+
openKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) {
+ throw new OMException("Open Key " + openKeyName + " is already
deleted/overwritten",
+ KEY_NOT_FOUND);
+ }
List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
OmKeyLocationInfo.getFromProtobuf(blockLocation));
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 2b238342ee..94cd63f9ba 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -221,29 +221,35 @@ public class OMKeyCommitRequest extends OMKeyRequest {
// creation and key commit, old versions will be just overwritten and
// not kept. Bucket versioning will be effective from the first key
// creation after the knob turned on.
- boolean isPreviousCommitHsync = false;
- Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
long writerClientId = commitKeyRequest.getClientID();
- if (isRecovery && keyToDelete != null) {
- String clientId =
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
- if (clientId == null) {
- throw new OMException("Failed to recovery key, as " +
- dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED);
- }
- writerClientId = Long.parseLong(clientId);
- }
-
+ boolean isSameHsyncKey = false;
+ boolean isOverwrittenHsyncKey = false;
final String clientIdString = String.valueOf(writerClientId);
if (null != keyToDelete) {
- isPreviousCommitHsync = java.util.Optional.of(keyToDelete)
+ isSameHsyncKey = java.util.Optional.of(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
.filter(id -> id.equals(clientIdString))
.isPresent();
+ if (!isSameHsyncKey) {
+ isOverwrittenHsyncKey = java.util.Optional.of(keyToDelete)
+ .map(WithMetadata::getMetadata)
+ .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
+ .filter(id -> !id.equals(clientIdString))
+ .isPresent() && !isRecovery;
+ }
}
+ if (isRecovery && keyToDelete != null) {
+ String clientId =
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+ if (clientId == null) {
+ throw new OMException("Failed to recovery key, as " +
+ dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED);
+ }
+ writerClientId = Long.parseLong(clientId);
+ }
String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, writerClientId);
omKeyInfo =
@@ -252,11 +258,12 @@ public class OMKeyCommitRequest extends OMKeyRequest {
String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
" entry is not found in the OpenKey table", KEY_NOT_FOUND);
- }
- if (omKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY)) {
- throw new OMException("Open Key " + keyName + " is already deleted",
+ } else if
(omKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY) ||
+
omKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) {
+ throw new OMException("Open Key " + keyName + " is already
deleted/overwritten",
KEY_NOT_FOUND);
}
+
if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
if (!isRecovery) {
@@ -265,8 +272,21 @@ public class OMKeyCommitRequest extends OMKeyRequest {
}
}
- omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
+ OmKeyInfo openKeyToDelete = null;
+ String dbOpenKeyToDeleteKey = null;
+ if (isOverwrittenHsyncKey) {
+ // find the overwritten openKey and add OVERWRITTEN_HSYNC_KEY to it.
+ dbOpenKeyToDeleteKey = omMetadataManager.getOpenKey(volumeName,
bucketName,
+ keyName,
Long.parseLong(keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID)));
+ openKeyToDelete =
omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKeyToDeleteKey);
+ openKeyToDelete.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY,
"true");
+ openKeyToDelete.setModificationTime(Time.now());
+ openKeyToDelete.setUpdateID(trxnLogIndex,
ozoneManager.isRatisEnabled());
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
+ dbOpenKeyToDeleteKey, openKeyToDelete, trxnLogIndex);
+ }
+ omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
// non-null indicates it is necessary to update the open key
OmKeyInfo newOpenKeyInfo = null;
@@ -290,10 +310,11 @@ public class OMKeyCommitRequest extends OMKeyRequest {
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
+ Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
long correctedSpace = omKeyInfo.getReplicatedSize();
// if keyToDelete isn't null, usedNamespace needn't check and
// increase.
- if (keyToDelete != null && (isHSync || isPreviousCommitHsync)) {
+ if (keyToDelete != null && (isSameHsyncKey)) {
correctedSpace -= keyToDelete.getReplicatedSize();
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
@@ -369,7 +390,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
omClientResponse = new OMKeyCommitResponse(omResponse.build(),
omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject(),
- oldKeyVersionsToDeleteMap, isHSync, newOpenKeyInfo);
+ oldKeyVersionsToDeleteMap, isHSync, newOpenKeyInfo,
dbOpenKeyToDeleteKey, openKeyToDelete);
result = Result.SUCCESS;
} catch (IOException | InvalidPathException ex) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index dd90e7ad89..ce0fdbc742 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -22,7 +22,9 @@ import java.nio.file.InvalidPathException;
import java.util.HashMap;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.om.helpers.WithMetadata;
import org.apache.hadoop.ozone.om.request.util.OmKeyHSyncUtil;
+import org.apache.hadoop.util.Time;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -38,7 +40,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmFSOFile;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.WithMetadata;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -147,6 +148,24 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
long writerClientId = commitKeyRequest.getClientID();
+ boolean isSameHsyncKey = false;
+ boolean isOverwrittenHsyncKey = false;
+ final String clientIdString = String.valueOf(writerClientId);
+ if (null != keyToDelete) {
+ isSameHsyncKey = java.util.Optional.of(keyToDelete)
+ .map(WithMetadata::getMetadata)
+ .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
+ .filter(id -> id.equals(clientIdString))
+ .isPresent();
+ if (!isSameHsyncKey) {
+ isOverwrittenHsyncKey = java.util.Optional.of(keyToDelete)
+ .map(WithMetadata::getMetadata)
+ .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
+ .filter(id -> !id.equals(clientIdString))
+ .isPresent() && !isRecovery;
+ }
+ }
+
if (isRecovery && keyToDelete != null) {
String clientId =
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
if (clientId == null) {
@@ -162,6 +181,10 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " +
dbOpenFileKey + " entry is not found in the OpenKey table",
KEY_NOT_FOUND);
+ } else if
(omKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY) ||
+
omKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) {
+ throw new OMException("Open Key " + keyName + " is already
deleted/overwritten",
+ KEY_NOT_FOUND);
}
if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
@@ -172,9 +195,22 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
}
}
- omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
+ OmKeyInfo openKeyToDelete = null;
+ String dbOpenKeyToDeleteKey = null;
+ if (isOverwrittenHsyncKey) {
+ // find the overwritten openKey and add OVERWRITTEN_HSYNC_KEY to it.
+ dbOpenKeyToDeleteKey = fsoFile.getOpenFileName(
+
Long.parseLong(keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID)));
+ openKeyToDelete = OMFileRequest.getOmKeyInfoFromFileTable(true,
+ omMetadataManager, dbOpenKeyToDeleteKey, keyName);
+ openKeyToDelete.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY,
"true");
+ openKeyToDelete.setModificationTime(Time.now());
+ openKeyToDelete.setUpdateID(trxnLogIndex,
ozoneManager.isRatisEnabled());
+ OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
+ dbOpenKeyToDeleteKey, openKeyToDelete, keyName, fileName,
trxnLogIndex);
+ }
- final String clientIdString = String.valueOf(writerClientId);
+ omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
// non-null indicates it is necessary to update the open key
OmKeyInfo newOpenKeyInfo = null;
@@ -200,21 +236,10 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
// creation and key commit, old versions will be just overwritten and
// not kept. Bucket versioning will be effective from the first key
// creation after the knob turned on.
- boolean isPreviousCommitHsync = false;
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
- if (null != keyToDelete) {
- isPreviousCommitHsync = java.util.Optional.of(keyToDelete)
- .map(WithMetadata::getMetadata)
- .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
- .filter(id -> id.equals(clientIdString))
- .isPresent();
- }
-
long correctedSpace = omKeyInfo.getReplicatedSize();
-
- // if keyToDelete isn't null, usedNamespace shouldn't check and
- // increase.
- if (keyToDelete != null && (isHSync || isPreviousCommitHsync)) {
+ // if keyToDelete isn't null, usedNamespace shouldn't check and increase.
+ if (keyToDelete != null && isSameHsyncKey) {
correctedSpace -= keyToDelete.getReplicatedSize();
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
@@ -294,7 +319,7 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
omClientResponse = new OMKeyCommitResponseWithFSO(omResponse.build(),
omKeyInfo, dbFileKey, dbOpenFileKey, omBucketInfo.copyObject(),
- oldKeyVersionsToDeleteMap, volumeId, isHSync, newOpenKeyInfo);
+ oldKeyVersionsToDeleteMap, volumeId, isHSync, newOpenKeyInfo,
dbOpenKeyToDeleteKey, openKeyToDelete);
result = Result.SUCCESS;
} catch (IOException | InvalidPathException ex) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index 685b296980..664daccc89 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -51,6 +51,8 @@ public class OMKeyCommitResponse extends OmKeyResponse {
private Map<String, RepeatedOmKeyInfo> keyToDeleteMap;
private boolean isHSync;
private OmKeyInfo newOpenKeyInfo;
+ private OmKeyInfo openKeyToUpdate;
+ private String openKeyNameToUpdate;
@SuppressWarnings("checkstyle:ParameterNumber")
public OMKeyCommitResponse(
@@ -59,7 +61,7 @@ public class OMKeyCommitResponse extends OmKeyResponse {
@Nonnull OmBucketInfo omBucketInfo,
Map<String, RepeatedOmKeyInfo> keyToDeleteMap,
boolean isHSync,
- OmKeyInfo newOpenKeyInfo) {
+ OmKeyInfo newOpenKeyInfo, String openKeyNameToUpdate, OmKeyInfo
openKeyToUpdate) {
super(omResponse, omBucketInfo.getBucketLayout());
this.omKeyInfo = omKeyInfo;
this.ozoneKeyName = ozoneKeyName;
@@ -68,6 +70,8 @@ public class OMKeyCommitResponse extends OmKeyResponse {
this.keyToDeleteMap = keyToDeleteMap;
this.isHSync = isHSync;
this.newOpenKeyInfo = newOpenKeyInfo;
+ this.openKeyNameToUpdate = openKeyNameToUpdate;
+ this.openKeyToUpdate = openKeyToUpdate;
}
/**
@@ -97,6 +101,7 @@ public class OMKeyCommitResponse extends OmKeyResponse {
.putWithBatch(batchOperation, ozoneKeyName, omKeyInfo);
updateDeletedTable(omMetadataManager, batchOperation);
+ handleOpenKeyToUpdate(omMetadataManager, batchOperation);
// update bucket usedBytes.
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
@@ -136,6 +141,14 @@ public class OMKeyCommitResponse extends OmKeyResponse {
}
}
+ protected void handleOpenKeyToUpdate(OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation) throws IOException {
+ if (this.openKeyToUpdate != null) {
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).putWithBatch(
+ batchOperation, openKeyNameToUpdate, openKeyToUpdate);
+ }
+ }
+
protected boolean isHSync() {
return isHSync;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
index 13034f77df..29a10e1d8c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
@@ -55,9 +55,9 @@ public class OMKeyCommitResponseWithFSO extends
OMKeyCommitResponse {
@Nonnull OmBucketInfo omBucketInfo,
Map<String, RepeatedOmKeyInfo> deleteKeyMap, long volumeId,
boolean isHSync,
- OmKeyInfo newOpenKeyInfo) {
+ OmKeyInfo newOpenKeyInfo, String openKeyNameToUpdate, OmKeyInfo
openKeyToUpdate) {
super(omResponse, omKeyInfo, ozoneKeyName, openKeyName,
- omBucketInfo, deleteKeyMap, isHSync, newOpenKeyInfo);
+ omBucketInfo, deleteKeyMap, isHSync, newOpenKeyInfo,
openKeyNameToUpdate, openKeyToUpdate);
this.volumeId = volumeId;
}
@@ -88,6 +88,7 @@ public class OMKeyCommitResponseWithFSO extends
OMKeyCommitResponse {
getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID());
updateDeletedTable(omMetadataManager, batchOperation);
+ handleOpenKeyToUpdate(omMetadataManager, batchOperation);
// update bucket usedBytes.
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index c4384c2dc9..762401bc57 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -158,6 +158,6 @@ public class TestOMKeyCommitResponse extends
TestOMKeyResponse {
new RepeatedOmKeyInfo(e)));
}
return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey,
- omBucketInfo, deleteKeyMap, isHSync, newOpenKeyInfo);
+ omBucketInfo, deleteKeyMap, isHSync, newOpenKeyInfo, null, null);
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
index 62998d87b7..e3d8840039 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponseWithFSO.java
@@ -57,7 +57,7 @@ public class TestOMKeyCommitResponseWithFSO extends
TestOMKeyCommitResponse {
new RepeatedOmKeyInfo(e)));
}
return new OMKeyCommitResponseWithFSO(omResponse, omKeyInfo, ozoneKey,
- openKey, omBucketInfo, deleteKeyMap, volumeId, isHSync,
newOpenKeyInfo);
+ openKey, omBucketInfo, deleteKeyMap, volumeId, isHSync,
newOpenKeyInfo, null, null);
}
@Nonnull
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]