This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cf54f34af HDDS-9588. DN import of container is not safe while 
replication (#5543)
1cf54f34af is described below

commit 1cf54f34af36b65ff5a59a8a4749d138f9b8f283
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Nov 9 13:52:02 2023 +0530

    HDDS-9588. DN import of container is not safe while replication (#5543)
---
 .../container/common/helpers/ContainerUtils.java   |   5 +-
 .../container/replication/ContainerImporter.java   |  61 +++++---
 .../common/helpers/TestContainerUtils.java         |   3 +-
 .../replication/TestContainerImporter.java         | 159 +++++++++++++++++++++
 4 files changed, 207 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 3c3371ebf3..b89ecff48c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -36,6 +36,7 @@ import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -310,13 +311,13 @@ public final class ContainerUtils {
   }
 
   public static String getContainerTarName(long containerId) {
-    return "container-" + containerId + ".tar";
+    return "container-" + containerId + "-" + UUID.randomUUID() + ".tar";
   }
 
   public static long retrieveContainerIdFromTarName(String tarName)
       throws IOException {
     assert tarName != null;
-    Pattern pattern = Pattern.compile("container-(\\d+).tar");
+    Pattern pattern = Pattern.compile("container-(\\d+)-.*\\.tar");
     // Now create matcher object.
     Matcher m = pattern.matcher(tarName);
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index 0327af8aac..a90c5624cb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -17,9 +17,19 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -34,12 +44,6 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
 /**
  * Imports container from tarball.
  */
@@ -56,6 +60,9 @@ public class ContainerImporter {
   private final VolumeChoosingPolicy volumeChoosingPolicy;
   private final long containerSize;
 
+  private final Set<Long> importContainerProgress
+      = Collections.synchronizedSet(new HashSet());
+
   public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
       ContainerController controller,
       MutableVolumeSet volumeSet) {
@@ -75,14 +82,29 @@ public class ContainerImporter {
   public void importContainer(long containerID, Path tarFilePath,
       HddsVolume hddsVolume, CopyContainerCompression compression)
       throws IOException {
-
-    HddsVolume targetVolume = hddsVolume;
-    if (targetVolume == null) {
-      targetVolume = chooseNextVolume();
+    if (!importContainerProgress.add(containerID)) {
+      deleteFileQuietely(tarFilePath);
+      LOG.warn("Container import in progress with container Id {}",
+          containerID);
+      throw new StorageContainerException("Container " +
+          "import in progress with container Id " + containerID,
+          ContainerProtos.Result.CONTAINER_EXISTS);
     }
+
     try {
-      KeyValueContainerData containerData;
+      if (containerSet.getContainer(containerID) != null) {
+        LOG.warn("Container already exists with container Id {}", containerID);
+        throw new StorageContainerException("Container already exists " +
+            "with container Id " + containerID,
+            ContainerProtos.Result.CONTAINER_EXISTS);
+      }
+
+      HddsVolume targetVolume = hddsVolume;
+      if (targetVolume == null) {
+        targetVolume = chooseNextVolume();
+      }
 
+      KeyValueContainerData containerData;
       TarContainerPacker packer = new TarContainerPacker(compression);
 
       try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
@@ -99,12 +121,17 @@ public class ContainerImporter {
         containerSet.addContainer(container);
       }
     } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting temporary container file: "
-            + tarFilePath.toAbsolutePath(), ex);
-      }
+      importContainerProgress.remove(containerID);
+      deleteFileQuietely(tarFilePath);
+    }
+  }
+
+  private static void deleteFileQuietely(Path tarFilePath) {
+    try {
+      Files.delete(tarFilePath);
+    } catch (Exception ex) {
+      LOG.error("Got exception while deleting temporary container file: "
+          + tarFilePath.toAbsolutePath(), ex);
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
index e56946905a..4f33e833a3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
@@ -82,8 +82,7 @@ public class TestContainerUtils {
   @Test
   public void testTarName() throws IOException {
     long containerId = 100;
-    String tarName = "container-100.tar";
-    assertEquals(tarName, ContainerUtils.getContainerTarName(containerId));
+    String tarName = ContainerUtils.getContainerTarName(containerId);
 
     assertEquals(containerId,
         ContainerUtils.retrieveContainerIdFromTarName(tarName));
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
new file mode 100644
index 0000000000..6b81acee77
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link ContainerImporter}.
+ */
+class TestContainerImporter {
+
+  private OzoneConfiguration conf;
+
+  @BeforeEach
+  void setup() {
+    conf = new OzoneConfiguration();
+  }
+
+  @Test
+  void importSameContainerWhenAlreadyImport() throws Exception {
+    long containerId = 1;
+    // create container
+    KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+        ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    ContainerController controllerMock = mock(ContainerController.class);
+    // create containerImporter object
+    ContainerSet containerSet = new ContainerSet(0);
+    containerSet.addContainer(container);
+    MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    ContainerImporter containerImporter = new ContainerImporter(conf,
+        containerSet, controllerMock, volumeSet);
+    File tarFile = new File("dummy.tar");
+    // second import should fail immediately
+    StorageContainerException ex = Assertions.assertThrows(
+        StorageContainerException.class,
+        () -> containerImporter.importContainer(containerId, tarFile.toPath(),
+            null, NO_COMPRESSION));
+    Assertions.assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
+        ex.getResult());
+    assertTrue(ex.getMessage().contains("Container already exists"));
+  }
+
+  @Test
+  void importSameContainerWhenFirstInProgress() throws Exception {
+    long containerId = 1;
+    // create container
+    KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+        ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    // mock controller for return container data with delay
+    ContainerController controllerMock = mock(ContainerController.class);
+    Semaphore semaphore = new Semaphore(0);
+    when(controllerMock.importContainer(any(), any(), any()))
+        .thenAnswer((invocation) -> {
+          semaphore.acquire();
+          return container;
+        });
+    // create containerImporter object
+    ContainerSet containerSet = new ContainerSet(0);
+    MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    ContainerImporter containerImporter = new ContainerImporter(conf,
+        containerSet, controllerMock, volumeSet);
+    // run import async first time having delay
+    File tarFile = containerTarFile(containerId, containerData);
+    CompletableFuture.runAsync(() -> {
+      try {
+        containerImporter.importContainer(containerId, tarFile.toPath(),
+            null, NO_COMPRESSION);
+      } catch (Exception ex) {
+        // do nothing
+      }
+    });
+    GenericTestUtils.waitFor(semaphore::hasQueuedThreads, 10, 5000);
+    // run import second time and should fail immediately as
+    // first import in progress
+    StorageContainerException ex = Assertions.assertThrows(
+        StorageContainerException.class,
+        () -> containerImporter.importContainer(containerId, tarFile.toPath(),
+            null, NO_COMPRESSION));
+    Assertions.assertEquals(ContainerProtos.Result.CONTAINER_EXISTS,
+        ex.getResult());
+    assertTrue(ex.getMessage().contains("import in progress"));
+    semaphore.release();
+  }
+
+  private File containerTarFile(
+      long containerId, ContainerData containerData) throws IOException {
+    TemporaryFolder tempFolder = new TemporaryFolder();
+    tempFolder.create();
+    File yamlFile = tempFolder.newFile("container.yaml");
+    ContainerDataYaml.createContainerFile(
+        ContainerProtos.ContainerType.KeyValueContainer, containerData,
+        yamlFile);
+    File tarFile = tempFolder.newFile(
+        ContainerUtils.getContainerTarName(containerId));
+    try (FileOutputStream output = new FileOutputStream(tarFile)) {
+      TarArchiveOutputStream archive = new TarArchiveOutputStream(output);
+      ArchiveEntry entry = archive.createArchiveEntry(yamlFile,
+          "container.yaml");
+      archive.putArchiveEntry(entry);
+      try (InputStream input = new FileInputStream(yamlFile)) {
+        IOUtils.copy(input, archive);
+      }
+      archive.closeArchiveEntry();
+    }
+    return tarFile;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to