This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c354195 HDDS-1016. Allow marking containers as unhealthy. Contributed
by Arpit Agarwal.
c354195 is described below
commit c35419579b5c5b315c5b62d8b89149924416b480
Author: Arpit Agarwal <[email protected]>
AuthorDate: Wed Jan 30 11:40:50 2019 -0800
HDDS-1016. Allow marking containers as unhealthy. Contributed by Arpit
Agarwal.
---
.../container/common/interfaces/Container.java | 5 +
.../container/keyvalue/KeyValueContainer.java | 60 +++++-
.../ozone/container/keyvalue/KeyValueHandler.java | 94 ++++++++-
.../TestKeyValueContainerMarkUnhealthy.java | 172 ++++++++++++++++
.../TestKeyValueHandlerWithUnhealthyContainer.java | 227 +++++++++++++++++++++
5 files changed, 538 insertions(+), 20 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 405cac3..58e3383 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -85,6 +85,11 @@ public interface Container<CONTAINERDATA extends
ContainerData> extends RwLock {
void markContainerForClose() throws StorageContainerException;
/**
+ * Marks the container replica as unhealthy.
+ */
+ void markContainerUnhealthy() throws StorageContainerException;
+
+ /**
* Quasi Closes a open container, if it is already closed or does not exist a
* StorageContainerException is thrown.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index e737a53..ba559e9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -64,6 +64,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_FILES_CREATE_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.DISK_OUT_OF_SPACE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -72,6 +73,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNSUPPORTED_REQUEST;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,8 +111,8 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
File containerMetaDataPath = null;
//acquiring volumeset read lock
- volumeSet.readLock();
long maxSize = containerData.getMaxSize();
+ volumeSet.readLock();
try {
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), maxSize);
@@ -270,28 +272,67 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
@Override
public void markContainerForClose() throws StorageContainerException {
- updateContainerData(() ->
- containerData.setState(ContainerDataProto.State.CLOSING));
+ writeLock();
+ try {
+ if (getContainerState() != ContainerDataProto.State.OPEN) {
+ throw new StorageContainerException(
+ "Attempting to close a " + getContainerState() + " container.",
+ CONTAINER_NOT_OPEN);
+ }
+ updateContainerData(() ->
+ containerData.setState(ContainerDataProto.State.CLOSING));
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public void markContainerUnhealthy() throws StorageContainerException {
+ writeLock();
+ try {
+ updateContainerData(() ->
+ containerData.setState(ContainerDataProto.State.UNHEALTHY));
+ } finally {
+ writeUnlock();
+ }
}
@Override
public void quasiClose() throws StorageContainerException {
- updateContainerData(containerData::quasiCloseContainer);
+ writeLock();
+ try {
+ updateContainerData(containerData::quasiCloseContainer);
+ } finally {
+ writeUnlock();
+ }
}
@Override
public void close() throws StorageContainerException {
- updateContainerData(containerData::closeContainer);
+ writeLock();
+ try {
+ updateContainerData(containerData::closeContainer);
+ } finally {
+ writeUnlock();
+ }
+
// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
compactDB();
}
+ /**
+ *
+ * Must be invoked with the writeLock held.
+ *
+ * @param update
+ * @throws StorageContainerException
+ */
private void updateContainerData(Runnable update)
throws StorageContainerException {
+ Preconditions.checkState(hasWriteLock());
ContainerDataProto.State oldState = null;
try {
- writeLock();
oldState = containerData.getState();
update.run();
File containerFile = getContainerFile();
@@ -304,12 +345,10 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
containerData.setState(oldState);
}
throw ex;
- } finally {
- writeUnlock();
}
}
- private void compactDB() throws StorageContainerException {
+ void compactDB() throws StorageContainerException {
try {
MetadataStore db = BlockUtils.getDB(containerData, config);
db.compactDB();
@@ -340,7 +379,8 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
}
@Override
- public void update(Map<String, String> metadata, boolean forceUpdate)
+ public void update(
+ Map<String, String> metadata, boolean forceUpdate)
throws StorageContainerException {
// TODO: Now, when writing the updated data to .container file, we are
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 261dbc4..3748966 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -111,7 +111,9 @@ public class KeyValueHandler extends Handler {
private final BlockDeletingService blockDeletingService;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long maxContainerSize;
- private final AutoCloseableLock handlerLock;
+
+ // A lock that is held during container creation.
+ private final AutoCloseableLock containerCreationLock;
private final boolean doSyncWrite;
public KeyValueHandler(Configuration config, StateContext context,
@@ -143,7 +145,7 @@ public class KeyValueHandler extends Handler {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
// this handler lock is used for synchronizing createContainer Requests,
// so using a fair lock here.
- handlerLock = new AutoCloseableLock(new ReentrantLock(true));
+ containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
}
@VisibleForTesting
@@ -212,7 +214,7 @@ public class KeyValueHandler extends Handler {
/**
* Handles Create Container Request. If successful, adds the container to
- * ContainerSet.
+ * ContainerSet and sends an ICR to the SCM.
*/
ContainerCommandResponseProto handleCreateContainer(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
@@ -235,14 +237,12 @@ public class KeyValueHandler extends Handler {
KeyValueContainer newContainer = new KeyValueContainer(
newContainerData, conf);
- try {
- handlerLock.acquire();
+ boolean created = false;
+ try (AutoCloseableLock l = containerCreationLock.acquire()) {
if (containerSet.getContainer(containerID) == null) {
newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
- containerSet.addContainer(newContainer);
- sendICR(newContainer);
+ created = containerSet.addContainer(newContainer);
} else {
-
// The create container request for an already existing container can
// arrive in case the ContainerStateMachine reapplies the transaction
// on datanode restart. Just log a warning msg here.
@@ -251,10 +251,15 @@ public class KeyValueHandler extends Handler {
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
- } finally {
- handlerLock.release();
}
+ if (created) {
+ try {
+ sendICR(newContainer);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ }
+ }
return ContainerUtils.getSuccessResponse(request);
}
@@ -282,6 +287,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
KeyValueContainerData containerData = kvContainer.getContainerData();
return KeyValueContainerUtil.getReadContainerResponse(
request, containerData);
@@ -420,6 +433,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
BlockData responseData;
try {
BlockID blockID = BlockID.getFromProtobuf(
@@ -451,6 +472,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
long blockLength;
try {
BlockID blockID = BlockID
@@ -510,6 +539,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
ChunkInfo chunkInfo;
byte[] data;
try {
@@ -538,6 +575,27 @@ public class KeyValueHandler extends Handler {
}
/**
+ * Throw an exception if the container is unhealthy.
+ *
+ * @throws StorageContainerException if the container is unhealthy.
+ * @param kvContainer
+ */
+ @VisibleForTesting
+ void checkContainerIsHealthy(KeyValueContainer kvContainer)
+ throws StorageContainerException {
+ kvContainer.readLock();
+ try {
+ if (kvContainer.getContainerData().getState() == State.UNHEALTHY) {
+ throw new StorageContainerException(
+ "The container replica is unhealthy.",
+ CONTAINER_UNHEALTHY);
+ }
+ } finally {
+ kvContainer.readUnlock();
+ }
+ }
+
+ /**
* Handle Delete Chunk operation. Calls ChunkManager to process the request.
*/
ContainerCommandResponseProto handleDeleteChunk(
@@ -549,6 +607,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
try {
checkContainerOpen(kvContainer);
@@ -697,6 +763,14 @@ public class KeyValueHandler extends Handler {
return ContainerUtils.malformedRequest(request);
}
+ // The container can become unhealthy after the lock is released.
+ // The operation will likely fail/timeout in that happens.
+ try {
+ checkContainerIsHealthy(kvContainer);
+ } catch (StorageContainerException sce) {
+ return ContainerUtils.logAndReturnError(LOG, sce, request);
+ }
+
GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
try {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
new file mode 100644
index 0000000..e11bca5
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMarkUnhealthy.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests unhealthy container functionality in the {@link KeyValueContainer}
+ * class.
+ */
+public class TestKeyValueContainerMarkUnhealthy {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestKeyValueContainerMarkUnhealthy.class);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Rule
+ public Timeout timeout = new Timeout(600_000);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private OzoneConfiguration conf;
+ private String scmId = UUID.randomUUID().toString();
+ private VolumeSet volumeSet;
+ private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+ private KeyValueContainerData keyValueContainerData;
+ private KeyValueContainer keyValueContainer;
+ private UUID datanodeId;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new OzoneConfiguration();
+ datanodeId = UUID.randomUUID();
+ HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+ .getAbsolutePath()).conf(conf).datanodeUuid(datanodeId
+ .toString()).build();
+
+ volumeSet = mock(VolumeSet.class);
+ volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+ Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+ .thenReturn(hddsVolume);
+
+ keyValueContainerData = new KeyValueContainerData(1L,
+ (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
+ datanodeId.toString());
+ final File metaDir = GenericTestUtils.getRandomizedTestDir();
+ metaDir.mkdirs();
+ keyValueContainerData.setMetadataPath(metaDir.getPath());
+
+
+ keyValueContainer = new KeyValueContainer(
+ keyValueContainerData, conf);
+ }
+
+ @After
+ public void teardown() {
+ volumeSet = null;
+ keyValueContainer = null;
+ keyValueContainerData = null;
+ }
+
+ /**
+ * Verify that the .container file is correctly updated when a
+ * container is marked as unhealthy.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testMarkContainerUnhealthy() throws IOException {
+ assertThat(keyValueContainerData.getState(), is(OPEN));
+ keyValueContainer.markContainerUnhealthy();
+ assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+
+ // Check metadata in the .container file
+ File containerFile = keyValueContainer.getContainerFile();
+
+ keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
+ .readContainerFile(containerFile);
+ assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+ }
+
+ /**
+ * Attempting to close an unhealthy container should fail.
+ * @throws IOException
+ */
+ @Test
+ public void testCloseUnhealthyContainer() throws IOException {
+ keyValueContainer.markContainerUnhealthy();
+ thrown.expect(StorageContainerException.class);
+ keyValueContainer.markContainerForClose();
+ }
+
+ /**
+ * Attempting to mark a closed container as unhealthy should succeed.
+ */
+ @Test
+ public void testMarkClosedContainerAsUnhealthy() throws IOException {
+ // We need to create the container so the compact-on-close operation
+ // does not NPE.
+ keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+ keyValueContainer.close();
+ keyValueContainer.markContainerUnhealthy();
+ assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+ }
+
+ /**
+ * Attempting to mark a quasi-closed container as unhealthy should succeed.
+ */
+ @Test
+ public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException {
+ keyValueContainer.quasiClose();
+ keyValueContainer.markContainerUnhealthy();
+ assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+ }
+
+ /**
+ * Attempting to mark a closing container as unhealthy should succeed.
+ */
+ @Test
+ public void testMarkClosingContainerAsUnhealthy() throws IOException {
+ keyValueContainer.markContainerForClose();
+ keyValueContainer.markContainerUnhealthy();
+ assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
new file mode 100644
index 0000000..e9443b1
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Test that KeyValueHandler fails certain operations when the
+ * container is unhealthy.
+ */
+public class TestKeyValueHandlerWithUnhealthyContainer {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestKeyValueHandlerWithUnhealthyContainer.class);
+
+ private final static String DATANODE_UUID = UUID.randomUUID().toString();
+ private static final long DUMMY_CONTAINER_ID = 9999;
+
+ @Test
+ public void testRead() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleReadContainer(
+ getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer),
+ container);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ @Test
+ public void testGetBlock() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleGetBlock(
+ getDummyCommandRequestProto(ContainerProtos.Type.GetBlock),
+ container);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ @Test
+ public void testGetCommittedBlockLength() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleGetCommittedBlockLength(
+ getDummyCommandRequestProto(
+ ContainerProtos.Type.GetCommittedBlockLength),
+ container);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ @Test
+ public void testReadChunk() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleReadChunk(
+ getDummyCommandRequestProto(
+ ContainerProtos.Type.ReadChunk),
+ container, null);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ @Test
+ public void testDeleteChunk() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleDeleteChunk(
+ getDummyCommandRequestProto(
+ ContainerProtos.Type.DeleteChunk),
+ container);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ @Test
+ public void testGetSmallFile() throws IOException {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleGetSmallFile(
+ getDummyCommandRequestProto(
+ ContainerProtos.Type.GetSmallFile),
+ container);
+ assertThat(response.getResult(), is(CONTAINER_UNHEALTHY));
+ }
+
+ // -- Helper methods below.
+
+ private KeyValueHandler getDummyHandler() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeDetails dnDetails = DatanodeDetails.newBuilder()
+ .setUuid(DATANODE_UUID)
+ .setHostName("dummyHost")
+ .setIpAddress("1.2.3.4")
+ .build();
+ DatanodeStateMachine stateMachine = mock(DatanodeStateMachine.class);
+ when(stateMachine.getDatanodeDetails()).thenReturn(dnDetails);
+
+ StateContext context = new StateContext(
+ conf, DatanodeStateMachine.DatanodeStates.RUNNING,
+ stateMachine);
+
+ return new KeyValueHandler(
+ new OzoneConfiguration(),
+ context,
+ mock(ContainerSet.class),
+ mock(VolumeSet.class),
+ mock(ContainerMetrics.class));
+ }
+
+ private KeyValueContainer getMockUnhealthyContainer() {
+ KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+ when(containerData.getState()).thenReturn(
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ return new KeyValueContainer(containerData, new OzoneConfiguration());
+ }
+
+ /**
+ * Construct fake protobuf messages for various types of requests.
+ * This is tedious, however necessary to test. Protobuf classes are final
+ * and cannot be mocked by Mockito.
+ *
+ * @param cmdType type of the container command.
+ * @return
+ */
+ private ContainerProtos.ContainerCommandRequestProto
getDummyCommandRequestProto(
+ ContainerProtos.Type cmdType) {
+ final ContainerProtos.ContainerCommandRequestProto.Builder builder =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(cmdType)
+ .setContainerID(DUMMY_CONTAINER_ID)
+ .setDatanodeUuid(DATANODE_UUID);
+
+ final ContainerProtos.DatanodeBlockID fakeBlockId =
+ ContainerProtos.DatanodeBlockID.newBuilder()
+ .setContainerID(DUMMY_CONTAINER_ID).setLocalID(1).build();
+
+ final ContainerProtos.ChunkInfo fakeChunkInfo =
+ ContainerProtos.ChunkInfo.newBuilder()
+ .setChunkName("dummy")
+ .setOffset(0)
+ .setLen(100)
+ .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+ .setBytesPerChecksum(1)
+ .setType(ContainerProtos.ChecksumType.CRC32)
+ .build())
+ .build();
+
+ switch(cmdType) {
+ case ReadContainer:
+
builder.setReadContainer(ContainerProtos.ReadContainerRequestProto.newBuilder().build());
+ break;
+ case GetBlock:
+ builder.setGetBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
+ .setBlockID(fakeBlockId).build());
+ break;
+ case GetCommittedBlockLength:
+ builder.setGetCommittedBlockLength(
+ ContainerProtos.GetCommittedBlockLengthRequestProto.newBuilder()
+ .setBlockID(fakeBlockId).build());
+ case ReadChunk:
+ builder.setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder()
+ .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build());
+ break;
+ case DeleteChunk:
+
builder.setDeleteChunk(ContainerProtos.DeleteChunkRequestProto.newBuilder()
+ .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build());
+ break;
+ case GetSmallFile:
+
builder.setGetSmallFile(ContainerProtos.GetSmallFileRequestProto.newBuilder()
+ .setBlock(ContainerProtos.GetBlockRequestProto.newBuilder()
+ .setBlockID(fakeBlockId)
+ .build())
+ .build());
+ break;
+
+ default:
+ Assert.fail("Unhandled request type " + cmdType + " in unit test");
+ }
+
+ return builder.build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]