This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1cf54f34af HDDS-9588. DN import of container is not safe while
replication (#5543)
1cf54f34af is described below
commit 1cf54f34af36b65ff5a59a8a4749d138f9b8f283
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Nov 9 13:52:02 2023 +0530
HDDS-9588. DN import of container is not safe while replication (#5543)
---
.../container/common/helpers/ContainerUtils.java | 5 +-
.../container/replication/ContainerImporter.java | 61 +++++---
.../common/helpers/TestContainerUtils.java | 3 +-
.../replication/TestContainerImporter.java | 159 +++++++++++++++++++++
4 files changed, 207 insertions(+), 21 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 3c3371ebf3..b89ecff48c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -36,6 +36,7 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -310,13 +311,13 @@ public final class ContainerUtils {
}
public static String getContainerTarName(long containerId) {
- return "container-" + containerId + ".tar";
+ return "container-" + containerId + "-" + UUID.randomUUID() + ".tar";
}
public static long retrieveContainerIdFromTarName(String tarName)
throws IOException {
assert tarName != null;
- Pattern pattern = Pattern.compile("container-(\\d+).tar");
+ Pattern pattern = Pattern.compile("container-(\\d+)-.*\\.tar");
// Now create matcher object.
Matcher m = pattern.matcher(tarName);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 0327af8aac..a90c5624cb 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -17,9 +17,19 @@
*/
package org.apache.hadoop.ozone.container.replication;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -34,12 +44,6 @@ import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
/**
* Imports container from tarball.
*/
@@ -56,6 +60,9 @@ public class ContainerImporter {
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long containerSize;
+ private final Set<Long> importContainerProgress
+ = Collections.synchronizedSet(new HashSet());
+
public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
ContainerController controller,
MutableVolumeSet volumeSet) {
@@ -75,14 +82,29 @@ public class ContainerImporter {
public void importContainer(long containerID, Path tarFilePath,
HddsVolume hddsVolume, CopyContainerCompression compression)
throws IOException {
-
- HddsVolume targetVolume = hddsVolume;
- if (targetVolume == null) {
- targetVolume = chooseNextVolume();
+ if (!importContainerProgress.add(containerID)) {
+ deleteFileQuietely(tarFilePath);
+ LOG.warn("Container import in progress with container Id {}",
+ containerID);
+ throw new StorageContainerException("Container " +
+ "import in progress with container Id " + containerID,
+ ContainerProtos.Result.CONTAINER_EXISTS);
}
+
try {
- KeyValueContainerData containerData;
+ if (containerSet.getContainer(containerID) != null) {
+ LOG.warn("Container already exists with container Id {}", containerID);
+ throw new StorageContainerException("Container already exists " +
+ "with container Id " + containerID,
+ ContainerProtos.Result.CONTAINER_EXISTS);
+ }
+
+ HddsVolume targetVolume = hddsVolume;
+ if (targetVolume == null) {
+ targetVolume = chooseNextVolume();
+ }
+ KeyValueContainerData containerData;
TarContainerPacker packer = new TarContainerPacker(compression);
try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
@@ -99,12 +121,17 @@ public class ContainerImporter {
containerSet.addContainer(container);
}
} finally {
- try {
- Files.delete(tarFilePath);
- } catch (Exception ex) {
- LOG.error("Got exception while deleting temporary container file: "
- + tarFilePath.toAbsolutePath(), ex);
- }
+ importContainerProgress.remove(containerID);
+ deleteFileQuietely(tarFilePath);
+ }
+ }
+
+ private static void deleteFileQuietely(Path tarFilePath) {
+ try {
+ Files.delete(tarFilePath);
+ } catch (Exception ex) {
+ LOG.error("Got exception while deleting temporary container file: "
+ + tarFilePath.toAbsolutePath(), ex);
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
index e56946905a..4f33e833a3 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
@@ -82,8 +82,7 @@ public class TestContainerUtils {
@Test
public void testTarName() throws IOException {
long containerId = 100;
- String tarName = "container-100.tar";
- assertEquals(tarName, ContainerUtils.getContainerTarName(containerId));
+ String tarName = ContainerUtils.getContainerTarName(containerId);
assertEquals(containerId,
ContainerUtils.retrieveContainerIdFromTarName(tarName));
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
new file mode 100644
index 0000000000..6b81acee77
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link ContainerImporter}.
+ */
+class TestContainerImporter {
+
+ private OzoneConfiguration conf;
+
+ @BeforeEach
+ void setup() {
+ conf = new OzoneConfiguration();
+ }
+
+ @Test
+ void importSameContainerWhenAlreadyImport() throws Exception {
+ long containerId = 1;
+ // create container
+ KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
+ ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+ KeyValueContainer container = new KeyValueContainer(containerData, conf);
+ ContainerController controllerMock = mock(ContainerController.class);
+ // create containerImporter object
+ ContainerSet containerSet = new ContainerSet(0);
+ containerSet.addContainer(container);
+ MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ ContainerImporter containerImporter = new ContainerImporter(conf,
+ containerSet, controllerMock, volumeSet);
+ File tarFile = new File("dummy.tar");
+ // second import should fail immediately
+ StorageContainerException ex = Assertions.assertThrows(
+ StorageContainerException.class,
+ () -> containerImporter.importContainer(containerId, tarFile.toPath(),
+ null, NO_COMPRESSION));
+ Assertions.assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
+ ex.getResult());
+ assertTrue(ex.getMessage().contains("Container already exists"));
+ }
+
+ @Test
+ void importSameContainerWhenFirstInProgress() throws Exception {
+ long containerId = 1;
+ // create container
+ KeyValueContainerData containerData = new
KeyValueContainerData(containerId,
+ ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+ KeyValueContainer container = new KeyValueContainer(containerData, conf);
+ // mock controller for return container data with delay
+ ContainerController controllerMock = mock(ContainerController.class);
+ Semaphore semaphore = new Semaphore(0);
+ when(controllerMock.importContainer(any(), any(), any()))
+ .thenAnswer((invocation) -> {
+ semaphore.acquire();
+ return container;
+ });
+ // create containerImporter object
+ ContainerSet containerSet = new ContainerSet(0);
+ MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ ContainerImporter containerImporter = new ContainerImporter(conf,
+ containerSet, controllerMock, volumeSet);
+ // run import async first time having delay
+ File tarFile = containerTarFile(containerId, containerData);
+ CompletableFuture.runAsync(() -> {
+ try {
+ containerImporter.importContainer(containerId, tarFile.toPath(),
+ null, NO_COMPRESSION);
+ } catch (Exception ex) {
+ // do nothing
+ }
+ });
+ GenericTestUtils.waitFor(semaphore::hasQueuedThreads, 10, 5000);
+ // run import second time and should fail immediately as
+ // first import in progress
+ StorageContainerException ex = Assertions.assertThrows(
+ StorageContainerException.class,
+ () -> containerImporter.importContainer(containerId, tarFile.toPath(),
+ null, NO_COMPRESSION));
+ Assertions.assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
+ ex.getResult());
+ assertTrue(ex.getMessage().contains("import in progress"));
+ semaphore.release();
+ }
+
+ private File containerTarFile(
+ long containerId, ContainerData containerData) throws IOException {
+ TemporaryFolder tempFolder = new TemporaryFolder();
+ tempFolder.create();
+ File yamlFile = tempFolder.newFile("container.yaml");
+ ContainerDataYaml.createContainerFile(
+ ContainerProtos.ContainerType.KeyValueContainer, containerData,
+ yamlFile);
+ File tarFile = tempFolder.newFile(
+ ContainerUtils.getContainerTarName(containerId));
+ try (FileOutputStream output = new FileOutputStream(tarFile)) {
+ TarArchiveOutputStream archive = new TarArchiveOutputStream(output);
+ ArchiveEntry entry = archive.createArchiveEntry(yamlFile,
+ "container.yaml");
+ archive.putArchiveEntry(entry);
+ try (InputStream input = new FileInputStream(yamlFile)) {
+ IOUtils.copy(input, archive);
+ }
+ archive.closeArchiveEntry();
+ }
+ return tarFile;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]