HDDS-662. Introduce ContainerReplicaState in StorageContainerManager. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50715c06 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50715c06 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50715c06 Branch: refs/heads/trunk Commit: 50715c0699b2603622223c40ef0729c83ac26cf0 Parents: a9a63ae Author: Nandakumar <[email protected]> Authored: Wed Oct 17 17:45:35 2018 +0530 Committer: Nandakumar <[email protected]> Committed: Wed Oct 17 17:45:35 2018 +0530 ---------------------------------------------------------------------- .../scm/client/ContainerOperationClient.java | 2 +- .../hadoop/hdds/scm/client/ScmClient.java | 2 +- .../hdds/scm/container/ContainerException.java | 46 ++ .../hadoop/hdds/scm/container/ContainerID.java | 28 +- .../hdds/scm/container/ContainerInfo.java | 449 +++++++++++++++ .../container/ContainerNotFoundException.java | 44 ++ .../ContainerReplicaNotFoundException.java | 45 ++ .../container/common/helpers/ContainerInfo.java | 482 ---------------- .../common/helpers/ContainerWithPipeline.java | 1 + .../StorageContainerLocationProtocol.java | 2 +- ...rLocationProtocolClientSideTranslatorPB.java | 2 +- ...rLocationProtocolServerSideTranslatorPB.java | 2 +- hadoop-hdds/common/src/main/proto/hdds.proto | 17 +- .../report/CommandStatusReportPublisher.java | 2 +- .../common/report/TestReportPublisher.java | 13 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 14 +- .../block/DatanodeDeletedBlockTransactions.java | 6 +- .../hdds/scm/block/DeletedBlockLogImpl.java | 5 +- .../container/CloseContainerEventHandler.java | 31 +- .../scm/container/CloseContainerWatcher.java | 3 +- .../hdds/scm/container/ContainerManager.java | 70 ++- .../hdds/scm/container/ContainerReplica.java | 197 +++++++ .../scm/container/ContainerReportHandler.java | 60 +- .../scm/container/ContainerStateManager.java | 242 ++++---- .../hdds/scm/container/SCMContainerManager.java | 566 ++++++++----------- .../replication/ReplicationManager.java | 38 +- .../scm/container/states/ContainerStateMap.java | 267 +++++---- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 74 ++- .../hdds/scm/server/SCMChillModeManager.java | 2 +- .../scm/server/SCMClientProtocolServer.java | 32 +- .../scm/server/SCMDatanodeProtocolServer.java | 2 +- .../scm/server/StorageContainerManager.java | 22 +- .../apache/hadoop/hdds/scm/HddsTestUtils.java | 2 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 48 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 2 +- .../hdds/scm/block/TestDeletedBlockLog.java | 14 +- .../TestCloseContainerEventHandler.java | 22 +- .../container/TestContainerReportHandler.java | 66 +-- .../container/TestContainerStateManager.java | 60 +- .../scm/container/TestSCMContainerManager.java | 117 ++-- .../replication/TestReplicationManager.java | 38 +- .../hdds/scm/node/TestContainerPlacement.java | 2 +- .../hdds/scm/node/TestDeadNodeHandler.java | 195 ++++--- .../scm/server/TestSCMChillModeManager.java | 2 +- .../container/TestCloseContainerWatcher.java | 12 +- .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java | 2 +- .../hdds/scm/cli/container/ListSubcommand.java | 4 +- .../ozone/client/io/ChunkGroupOutputStream.java | 2 +- .../TestContainerStateManagerIntegration.java | 219 ++++--- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 7 +- .../hdds/scm/pipeline/TestPipelineClose.java | 10 +- .../org/apache/hadoop/ozone/OzoneTestUtils.java | 10 +- .../ozone/TestStorageContainerManager.java | 4 +- .../ozone/client/rest/TestOzoneRestClient.java | 4 +- .../rpc/TestCloseContainerHandlingByClient.java | 10 +- .../ozone/client/rpc/TestOzoneRpcClient.java | 6 +- .../commandhandler/TestBlockDeletion.java | 3 +- .../TestCloseContainerByPipeline.java | 10 +- .../TestCloseContainerHandler.java | 4 +- .../ozone/om/TestContainerReportWithKeys.java | 2 +- .../hadoop/ozone/om/TestScmChillMode.java | 14 +- .../apache/hadoop/ozone/om/KeyManagerImpl.java | 4 - .../genesis/BenchMarkContainerStateMap.java | 14 +- .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 8 +- .../hadoop/ozone/scm/TestContainerSQLCli.java | 3 +- 65 files changed, 1949 insertions(+), 1739 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index c2bfb42..c635df4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 184c547..c37f42c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.client; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerData; http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java new file mode 100644 index 0000000..9d37dfb --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; + +/** + * Signals that ContainerException of some sort has occurred. This is parent + * of all the exceptions thrown by ContainerManager. + */ +public class ContainerException extends IOException { + + /** + * Constructs an {@code ContainerException} with {@code null} + * as its error detail message. + */ + public ContainerException() { + super(); + } + + /** + * Constructs an {@code ContainerException} with the specified detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public ContainerException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java index 49af297..e7ac350 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; import org.apache.commons.lang3.builder.CompareToBuilder; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -29,18 +30,17 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; * We are creating a specific type for this to avoid mixing this with * normal integers in code. */ -public class ContainerID implements Comparable { +public final class ContainerID implements Comparable<ContainerID> { private final long id; + // TODO: make this private. /** * Constructs ContainerID. * * @param id int */ public ContainerID(long id) { - Preconditions.checkState(id > 0, - "Container ID should be a positive long. "+ id); this.id = id; } @@ -49,7 +49,9 @@ public class ContainerID implements Comparable { * @param containerID long * @return ContainerID. */ - public static ContainerID valueof(long containerID) { + public static ContainerID valueof(final long containerID) { + Preconditions.checkState(containerID > 0, + "Container ID should be a positive long. "+ containerID); return new ContainerID(containerID); } @@ -62,8 +64,12 @@ public class ContainerID implements Comparable { return id; } + public byte[] getBytes() { + return Longs.toByteArray(id); + } + @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } @@ -72,7 +78,7 @@ public class ContainerID implements Comparable { return false; } - ContainerID that = (ContainerID) o; + final ContainerID that = (ContainerID) o; return new EqualsBuilder() .append(getId(), that.getId()) @@ -87,14 +93,8 @@ public class ContainerID implements Comparable { } @Override - public int compareTo(Object o) { - Preconditions.checkNotNull(o); - if(getClass() != o.getClass()) { - throw new ClassCastException("ContainerID class expected. found:" + - o.getClass().toString()); - } - - ContainerID that = (ContainerID) o; + public int compareTo(final ContainerID that) { + Preconditions.checkNotNull(that); return new CompareToBuilder() .append(this.getId(), that.getId()) .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java new file mode 100644 index 0000000..a5ea3e3 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import static java.lang.Math.max; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Arrays; +import java.util.Comparator; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.util.Time; + +/** + * Class wraps ozone container info. + */ +public class ContainerInfo implements Comparator<ContainerInfo>, + Comparable<ContainerInfo>, Externalizable { + + private static final ObjectWriter WRITER; + private static final String SERIALIZATION_ERROR_MSG = "Java serialization not" + + " supported. Use protobuf instead."; + + static { + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + mapper + .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE); + WRITER = mapper.writer(); + } + + private HddsProtos.LifeCycleState state; + @JsonIgnore + private PipelineID pipelineID; + private ReplicationFactor replicationFactor; + private ReplicationType replicationType; + private long usedBytes; + private long numberOfKeys; + private long lastUsed; + // The wall-clock ms since the epoch at which the current state enters. + private long stateEnterTime; + private String owner; + private long containerID; + private long deleteTransactionId; + /** + * Allows you to maintain private data on ContainerInfo. This is not + * serialized via protobuf, just allows us to maintain some private data. + */ + @JsonIgnore + private byte[] data; + + ContainerInfo( + long containerID, + HddsProtos.LifeCycleState state, + PipelineID pipelineID, + long usedBytes, + long numberOfKeys, + long stateEnterTime, + String owner, + long deleteTransactionId, + ReplicationFactor replicationFactor, + ReplicationType repType) { + this.containerID = containerID; + this.pipelineID = pipelineID; + this.usedBytes = usedBytes; + this.numberOfKeys = numberOfKeys; + this.lastUsed = Time.monotonicNow(); + this.state = state; + this.stateEnterTime = stateEnterTime; + this.owner = owner; + this.deleteTransactionId = deleteTransactionId; + this.replicationFactor = replicationFactor; + this.replicationType = repType; + } + + public ContainerInfo(ContainerInfo info) { + this(info.getContainerID(), info.getState(), info.getPipelineID(), + info.getUsedBytes(), info.getNumberOfKeys(), + info.getStateEnterTime(), info.getOwner(), + info.getDeleteTransactionId(), info.getReplicationFactor(), + info.getReplicationType()); + } + /** + * Needed for serialization findbugs. + */ + public ContainerInfo() { + } + + public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) { + ContainerInfo.Builder builder = new ContainerInfo.Builder(); + return builder.setPipelineID( + PipelineID.getFromProtobuf(info.getPipelineID())) + .setUsedBytes(info.getUsedBytes()) + .setNumberOfKeys(info.getNumberOfKeys()) + .setState(info.getState()) + .setStateEnterTime(info.getStateEnterTime()) + .setOwner(info.getOwner()) + .setContainerID(info.getContainerID()) + .setDeleteTransactionId(info.getDeleteTransactionId()) + .setReplicationFactor(info.getReplicationFactor()) + .setReplicationType(info.getReplicationType()) + .build(); + } + + public long getContainerID() { + return containerID; + } + + public HddsProtos.LifeCycleState getState() { + return state; + } + + public void setState(HddsProtos.LifeCycleState state) { + this.state = state; + } + + public long getStateEnterTime() { + return stateEnterTime; + } + + public ReplicationFactor getReplicationFactor() { + return replicationFactor; + } + + public PipelineID getPipelineID() { + return pipelineID; + } + + public long getUsedBytes() { + return usedBytes; + } + + public long getNumberOfKeys() { + return numberOfKeys; + } + + public long getDeleteTransactionId() { + return deleteTransactionId; + } + + public void updateDeleteTransactionId(long transactionId) { + deleteTransactionId = max(transactionId, deleteTransactionId); + } + + public ContainerID containerID() { + return new ContainerID(getContainerID()); + } + + /** + * Gets the last used time from SCM's perspective. + * + * @return time in milliseconds. + */ + public long getLastUsed() { + return lastUsed; + } + + public ReplicationType getReplicationType() { + return replicationType; + } + + public void updateLastUsedTime() { + lastUsed = Time.monotonicNow(); + } + + public HddsProtos.SCMContainerInfo getProtobuf() { + HddsProtos.SCMContainerInfo.Builder builder = + HddsProtos.SCMContainerInfo.newBuilder(); + Preconditions.checkState(containerID > 0); + return builder.setContainerID(getContainerID()) + .setUsedBytes(getUsedBytes()) + .setNumberOfKeys(getNumberOfKeys()).setState(getState()) + .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID()) + .setDeleteTransactionId(getDeleteTransactionId()) + .setPipelineID(getPipelineID().getProtobuf()) + .setReplicationFactor(getReplicationFactor()) + .setReplicationType(getReplicationType()) + .setOwner(getOwner()) + .build(); + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + @Override + public String toString() { + return "ContainerInfo{" + + "id=" + containerID + + ", state=" + state + + ", pipelineID=" + pipelineID + + ", stateEnterTime=" + stateEnterTime + + ", owner=" + owner + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ContainerInfo that = (ContainerInfo) o; + + return new EqualsBuilder() + .append(getContainerID(), that.getContainerID()) + + // TODO : Fix this later. If we add these factors some tests fail. + // So Commenting this to continue and will enforce this with + // Changes in pipeline where we remove Container Name to + // SCMContainerinfo from Pipeline. + // .append(pipeline.getFactor(), that.pipeline.getFactor()) + // .append(pipeline.getType(), that.pipeline.getType()) + .append(owner, that.owner) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(11, 811) + .append(getContainerID()) + .append(getOwner()) + .toHashCode(); + } + + /** + * Compares its two arguments for order. Returns a negative integer, zero, or + * a positive integer as the first argument is less than, equal to, or greater + * than the second.<p> + * + * @param o1 the first object to be compared. + * @param o2 the second object to be compared. + * @return a negative integer, zero, or a positive integer as the first + * argument is less than, equal to, or greater than the second. + * @throws NullPointerException if an argument is null and this comparator + * does not permit null arguments + * @throws ClassCastException if the arguments' types prevent them from + * being compared by this comparator. + */ + @Override + public int compare(ContainerInfo o1, ContainerInfo o2) { + return Long.compare(o1.getLastUsed(), o2.getLastUsed()); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less than, + * equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(ContainerInfo o) { + return this.compare(this, o); + } + + /** + * Returns a JSON string of this object. + * + * @return String - json string + * @throws IOException + */ + public String toJsonString() throws IOException { + return WRITER.writeValueAsString(this); + } + + /** + * Returns private data that is set on this containerInfo. + * + * @return blob, the user can interpret it any way they like. + */ + public byte[] getData() { + if (this.data != null) { + return Arrays.copyOf(this.data, this.data.length); + } else { + return null; + } + } + + /** + * Set private data on ContainerInfo object. + * + * @param data -- private data. + */ + public void setData(byte[] data) { + if (data != null) { + this.data = Arrays.copyOf(data, data.length); + } + } + + /** + * Throws IOException as default java serialization is not supported. Use + * serialization via protobuf instead. + * + * @param out the stream to write the object to + * @throws IOException Includes any I/O exceptions that may occur + * @serialData Overriding methods should use this tag to describe + * the data layout of this Externalizable object. + * List the sequence of element types and, if possible, + * relate the element to a public/protected field and/or + * method of this Externalizable class. + */ + @Override + public void writeExternal(ObjectOutput out) throws IOException { + throw new IOException(SERIALIZATION_ERROR_MSG); + } + + /** + * Throws IOException as default java serialization is not supported. Use + * serialization via protobuf instead. + * + * @param in the stream to read data from in order to restore the object + * @throws IOException if I/O errors occur + * @throws ClassNotFoundException If the class for an object being + * restored cannot be found. + */ + @Override + public void readExternal(ObjectInput in) + throws IOException, ClassNotFoundException { + throw new IOException(SERIALIZATION_ERROR_MSG); + } + + /** + * Builder class for ContainerInfo. + */ + public static class Builder { + private HddsProtos.LifeCycleState state; + private long used; + private long keys; + private long stateEnterTime; + private String owner; + private long containerID; + private long deleteTransactionId; + private PipelineID pipelineID; + private ReplicationFactor replicationFactor; + private ReplicationType replicationType; + + public Builder setReplicationType( + ReplicationType repType) { + this.replicationType = repType; + return this; + } + + public Builder setPipelineID(PipelineID pipelineId) { + this.pipelineID = pipelineId; + return this; + } + + public Builder setReplicationFactor(ReplicationFactor repFactor) { + this.replicationFactor = repFactor; + return this; + } + + public Builder setContainerID(long id) { + Preconditions.checkState(id >= 0); + this.containerID = id; + return this; + } + + public Builder setState(HddsProtos.LifeCycleState lifeCycleState) { + this.state = lifeCycleState; + return this; + } + + public Builder setUsedBytes(long bytesUsed) { + this.used = bytesUsed; + return this; + } + + public Builder setNumberOfKeys(long keyCount) { + this.keys = keyCount; + return this; + } + + public Builder setStateEnterTime(long time) { + this.stateEnterTime = time; + return this; + } + + public Builder setOwner(String containerOwner) { + this.owner = containerOwner; + return this; + } + + public Builder setDeleteTransactionId(long deleteTransactionID) { + this.deleteTransactionId = deleteTransactionID; + return this; + } + + public ContainerInfo build() { + return new ContainerInfo(containerID, state, pipelineID, + used, keys, stateEnterTime, owner, deleteTransactionId, + replicationFactor, replicationType); + } + } + + /** + * Check if a container is in open state, this will check if the + * container is either open, allocated, creating or creating. + * Any containers in these states is managed as an open container by SCM. + */ + public boolean isOpen() { + return state == HddsProtos.LifeCycleState.ALLOCATED || + state == HddsProtos.LifeCycleState.CREATING || + state == HddsProtos.LifeCycleState.OPEN || + state == HddsProtos.LifeCycleState.CLOSING; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java new file mode 100644 index 0000000..3eebcce --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerNotFoundException.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +/** + * Signals that a container is missing from ContainerManager. + */ +public class ContainerNotFoundException extends ContainerException { + + /** + * Constructs an {@code ContainerNotFoundException} with {@code null} + * as its error detail message. + */ + public ContainerNotFoundException() { + super(); + } + + /** + * Constructs an {@code ContainerNotFoundException} with the specified + * detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public ContainerNotFoundException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java new file mode 100644 index 0000000..fdbc18b --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaNotFoundException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +/** + * Signals that a ContainerReplica is missing from the Container in + * ContainerManager. + */ +public class ContainerReplicaNotFoundException extends ContainerException { + + /** + * Constructs an {@code ContainerReplicaNotFoundException} with {@code null} + * as its error detail message. + */ + public ContainerReplicaNotFoundException() { + super(); + } + + /** + * Constructs an {@code ContainerReplicaNotFoundException} with the + * specified detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public ContainerReplicaNotFoundException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java deleted file mode 100644 index 5abcd14..0000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.container.common.helpers; - -import static java.lang.Math.max; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.google.common.base.Preconditions; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Arrays; -import java.util.Comparator; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.util.Time; - -/** - * Class wraps ozone container info. - */ -public class ContainerInfo implements Comparator<ContainerInfo>, - Comparable<ContainerInfo>, Externalizable { - - private static final ObjectWriter WRITER; - private static final String SERIALIZATION_ERROR_MSG = "Java serialization not" - + " supported. Use protobuf instead."; - - static { - ObjectMapper mapper = new ObjectMapper(); - mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - mapper - .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE); - WRITER = mapper.writer(); - } - - private HddsProtos.LifeCycleState state; - @JsonIgnore - private PipelineID pipelineID; - private ReplicationFactor replicationFactor; - private ReplicationType replicationType; - // Bytes allocated by SCM for clients. - private long allocatedBytes; - // Actual container usage, updated through heartbeat. - private long usedBytes; - private long numberOfKeys; - private long lastUsed; - // The wall-clock ms since the epoch at which the current state enters. - private long stateEnterTime; - private String owner; - private long containerID; - private long deleteTransactionId; - /** - * Allows you to maintain private data on ContainerInfo. This is not - * serialized via protobuf, just allows us to maintain some private data. - */ - @JsonIgnore - private byte[] data; - - ContainerInfo( - long containerID, - HddsProtos.LifeCycleState state, - PipelineID pipelineID, - long allocatedBytes, - long usedBytes, - long numberOfKeys, - long stateEnterTime, - String owner, - long deleteTransactionId, - ReplicationFactor replicationFactor, - ReplicationType repType) { - this.containerID = containerID; - this.pipelineID = pipelineID; - this.allocatedBytes = allocatedBytes; - this.usedBytes = usedBytes; - this.numberOfKeys = numberOfKeys; - this.lastUsed = Time.monotonicNow(); - this.state = state; - this.stateEnterTime = stateEnterTime; - this.owner = owner; - this.deleteTransactionId = deleteTransactionId; - this.replicationFactor = replicationFactor; - this.replicationType = repType; - } - - public ContainerInfo(ContainerInfo info) { - this(info.getContainerID(), info.getState(), info.getPipelineID(), - info.getAllocatedBytes(), info.getUsedBytes(), info.getNumberOfKeys(), - info.getStateEnterTime(), info.getOwner(), - info.getDeleteTransactionId(), info.getReplicationFactor(), - info.getReplicationType()); - } - /** - * Needed for serialization findbugs. - */ - public ContainerInfo() { - } - - public static ContainerInfo fromProtobuf(HddsProtos.SCMContainerInfo info) { - ContainerInfo.Builder builder = new ContainerInfo.Builder(); - return builder.setPipelineID( - PipelineID.getFromProtobuf(info.getPipelineID())) - .setAllocatedBytes(info.getAllocatedBytes()) - .setUsedBytes(info.getUsedBytes()) - .setNumberOfKeys(info.getNumberOfKeys()) - .setState(info.getState()) - .setStateEnterTime(info.getStateEnterTime()) - .setOwner(info.getOwner()) - .setContainerID(info.getContainerID()) - .setDeleteTransactionId(info.getDeleteTransactionId()) - .setReplicationFactor(info.getReplicationFactor()) - .setReplicationType(info.getReplicationType()) - .build(); - } - - public long getContainerID() { - return containerID; - } - - public HddsProtos.LifeCycleState getState() { - return state; - } - - public void setState(HddsProtos.LifeCycleState state) { - this.state = state; - } - - public long getStateEnterTime() { - return stateEnterTime; - } - - public ReplicationFactor getReplicationFactor() { - return replicationFactor; - } - - public PipelineID getPipelineID() { - return pipelineID; - } - - public long getAllocatedBytes() { - return allocatedBytes; - } - - /** - * Set Allocated bytes. - * - * @param size - newly allocated bytes -- negative size is case of deletes - * can be used. - */ - public void updateAllocatedBytes(long size) { - this.allocatedBytes += size; - } - - public long getUsedBytes() { - return usedBytes; - } - - public long getNumberOfKeys() { - return numberOfKeys; - } - - public long getDeleteTransactionId() { - return deleteTransactionId; - } - - public void updateDeleteTransactionId(long transactionId) { - deleteTransactionId = max(transactionId, deleteTransactionId); - } - - public ContainerID containerID() { - return new ContainerID(getContainerID()); - } - - /** - * Gets the last used time from SCM's perspective. - * - * @return time in milliseconds. - */ - public long getLastUsed() { - return lastUsed; - } - - public ReplicationType getReplicationType() { - return replicationType; - } - - public void updateLastUsedTime() { - lastUsed = Time.monotonicNow(); - } - - public void allocate(long size) { - // should we also have total container size in ContainerInfo - // and check before allocating? - allocatedBytes += size; - } - - public HddsProtos.SCMContainerInfo getProtobuf() { - HddsProtos.SCMContainerInfo.Builder builder = - HddsProtos.SCMContainerInfo.newBuilder(); - Preconditions.checkState(containerID > 0); - return builder.setAllocatedBytes(getAllocatedBytes()) - .setContainerID(getContainerID()) - .setUsedBytes(getUsedBytes()) - .setNumberOfKeys(getNumberOfKeys()).setState(getState()) - .setStateEnterTime(getStateEnterTime()).setContainerID(getContainerID()) - .setDeleteTransactionId(getDeleteTransactionId()) - .setPipelineID(getPipelineID().getProtobuf()) - .setReplicationFactor(getReplicationFactor()) - .setReplicationType(getReplicationType()) - .setOwner(getOwner()) - .build(); - } - - public String getOwner() { - return owner; - } - - public void setOwner(String owner) { - this.owner = owner; - } - - @Override - public String toString() { - return "ContainerInfo{" - + "id=" + containerID - + ", state=" + state - + ", pipelineID=" + pipelineID - + ", stateEnterTime=" + stateEnterTime - + ", owner=" + owner - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - ContainerInfo that = (ContainerInfo) o; - - return new EqualsBuilder() - .append(getContainerID(), that.getContainerID()) - - // TODO : Fix this later. If we add these factors some tests fail. - // So Commenting this to continue and will enforce this with - // Changes in pipeline where we remove Container Name to - // SCMContainerinfo from Pipeline. - // .append(pipeline.getFactor(), that.pipeline.getFactor()) - // .append(pipeline.getType(), that.pipeline.getType()) - .append(owner, that.owner) - .isEquals(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(11, 811) - .append(getContainerID()) - .append(getOwner()) - .toHashCode(); - } - - /** - * Compares its two arguments for order. Returns a negative integer, zero, or - * a positive integer as the first argument is less than, equal to, or greater - * than the second.<p> - * - * @param o1 the first object to be compared. - * @param o2 the second object to be compared. - * @return a negative integer, zero, or a positive integer as the first - * argument is less than, equal to, or greater than the second. - * @throws NullPointerException if an argument is null and this comparator - * does not permit null arguments - * @throws ClassCastException if the arguments' types prevent them from - * being compared by this comparator. - */ - @Override - public int compare(ContainerInfo o1, ContainerInfo o2) { - return Long.compare(o1.getLastUsed(), o2.getLastUsed()); - } - - /** - * Compares this object with the specified object for order. Returns a - * negative integer, zero, or a positive integer as this object is less than, - * equal to, or greater than the specified object. - * - * @param o the object to be compared. - * @return a negative integer, zero, or a positive integer as this object is - * less than, equal to, or greater than the specified object. - * @throws NullPointerException if the specified object is null - * @throws ClassCastException if the specified object's type prevents it - * from being compared to this object. - */ - @Override - public int compareTo(ContainerInfo o) { - return this.compare(this, o); - } - - /** - * Returns a JSON string of this object. - * - * @return String - json string - * @throws IOException - */ - public String toJsonString() throws IOException { - return WRITER.writeValueAsString(this); - } - - /** - * Returns private data that is set on this containerInfo. - * - * @return blob, the user can interpret it any way they like. - */ - public byte[] getData() { - if (this.data != null) { - return Arrays.copyOf(this.data, this.data.length); - } else { - return null; - } - } - - /** - * Set private data on ContainerInfo object. - * - * @param data -- private data. - */ - public void setData(byte[] data) { - if (data != null) { - this.data = Arrays.copyOf(data, data.length); - } - } - - /** - * Throws IOException as default java serialization is not supported. Use - * serialization via protobuf instead. - * - * @param out the stream to write the object to - * @throws IOException Includes any I/O exceptions that may occur - * @serialData Overriding methods should use this tag to describe - * the data layout of this Externalizable object. - * List the sequence of element types and, if possible, - * relate the element to a public/protected field and/or - * method of this Externalizable class. - */ - @Override - public void writeExternal(ObjectOutput out) throws IOException { - throw new IOException(SERIALIZATION_ERROR_MSG); - } - - /** - * Throws IOException as default java serialization is not supported. Use - * serialization via protobuf instead. - * - * @param in the stream to read data from in order to restore the object - * @throws IOException if I/O errors occur - * @throws ClassNotFoundException If the class for an object being - * restored cannot be found. - */ - @Override - public void readExternal(ObjectInput in) - throws IOException, ClassNotFoundException { - throw new IOException(SERIALIZATION_ERROR_MSG); - } - - /** - * Builder class for ContainerInfo. - */ - public static class Builder { - private HddsProtos.LifeCycleState state; - private long allocated; - private long used; - private long keys; - private long stateEnterTime; - private String owner; - private long containerID; - private long deleteTransactionId; - private PipelineID pipelineID; - private ReplicationFactor replicationFactor; - private ReplicationType replicationType; - - public Builder setReplicationType( - ReplicationType repType) { - this.replicationType = repType; - return this; - } - - public Builder setPipelineID(PipelineID pipelineId) { - this.pipelineID = pipelineId; - return this; - } - - public Builder setReplicationFactor(ReplicationFactor repFactor) { - this.replicationFactor = repFactor; - return this; - } - - public Builder setContainerID(long id) { - Preconditions.checkState(id >= 0); - this.containerID = id; - return this; - } - - public Builder setState(HddsProtos.LifeCycleState lifeCycleState) { - this.state = lifeCycleState; - return this; - } - - public Builder setAllocatedBytes(long bytesAllocated) { - this.allocated = bytesAllocated; - return this; - } - - public Builder setUsedBytes(long bytesUsed) { - this.used = bytesUsed; - return this; - } - - public Builder setNumberOfKeys(long keyCount) { - this.keys = keyCount; - return this; - } - - public Builder setStateEnterTime(long time) { - this.stateEnterTime = time; - return this; - } - - public Builder setOwner(String containerOwner) { - this.owner = containerOwner; - return this; - } - - public Builder setDeleteTransactionId(long deleteTransactionID) { - this.deleteTransactionId = deleteTransactionID; - return this; - } - - public ContainerInfo build() { - return new ContainerInfo(containerID, state, pipelineID, allocated, - used, keys, stateEnterTime, owner, deleteTransactionId, - replicationFactor, replicationType); - } - } - - /** - * Check if a container is in open state, this will check if the - * container is either open, allocated, creating or creating. - * Any containers in these states is managed as an open container by SCM. - */ - public boolean isContainerOpen() { - return state == HddsProtos.LifeCycleState.ALLOCATED || - state == HddsProtos.LifeCycleState.CREATING || - state == HddsProtos.LifeCycleState.OPEN || - state == HddsProtos.LifeCycleState.CLOSING; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java index 64f42b3..af74a7d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java @@ -22,6 +22,7 @@ import java.util.Comparator; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; /** * Class wraps ozone container info. http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index e38077f..712fb7e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.protocol; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 16819e9..8e723e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InChillModeResponseProto; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index d2723f0..e2a4ee0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 6525134..9e813af 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -135,16 +135,13 @@ message SCMContainerInfo { required int64 containerID = 1; required LifeCycleState state = 2; optional PipelineID pipelineID = 3; - // This is not total size of container, but space allocated by SCM for - // clients to write blocks - required uint64 allocatedBytes = 4; - required uint64 usedBytes = 5; - required uint64 numberOfKeys = 6; - optional int64 stateEnterTime = 7; - required string owner = 8; - optional int64 deleteTransactionId = 9; - required ReplicationFactor replicationFactor = 10; - required ReplicationType replicationType = 11; + required uint64 usedBytes = 4; + required uint64 numberOfKeys = 5; + optional int64 stateEnterTime = 6; + required string owner = 7; + optional int64 deleteTransactionId = 8; + required ReplicationFactor replicationFactor = 9; + required ReplicationType replicationType = 10; } message ContainerWithPipeline { http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java index 4736857..f52387b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java @@ -75,10 +75,10 @@ public class CommandStatusReportPublisher extends iterator.forEachRemaining(key -> { CommandStatus cmdStatus = map.get(key); - builder.addCmdStatus(cmdStatus.getProtoBufMessage()); // If status is still pending then don't remove it from map as // CommandHandler will change its status when it works on this command. if (!cmdStatus.getStatus().equals(Status.PENDING)) { + builder.addCmdStatus(cmdStatus.getProtoBufMessage()); map.remove(key); } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 1e82326..b632e02 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -153,19 +153,10 @@ public class TestReportPublisher { .build(); cmdStatusMap.put(obj1.getCmdId(), obj1); cmdStatusMap.put(obj2.getCmdId(), obj2); - Assert.assertEquals("Should publish report with 2 status objects", 2, + // We are not sending the commands whose status is PENDING. + Assert.assertEquals("Should publish report with 2 status objects", 1, ((CommandStatusReportPublisher) publisher).getReport() .getCmdStatusCount()); - Assert.assertEquals( - "Next report should have 1 status objects as command status o" - + "bjects are still in Pending state", - 1, ((CommandStatusReportPublisher) publisher).getReport() - .getCmdStatusCount()); - Assert.assertTrue( - "Next report should have 1 status objects as command status " - + "objects are still in Pending state", - ((CommandStatusReportPublisher) publisher).getReport() - .getCmdStatusList().get(0).getStatus().equals(Status.PENDING)); executorService.shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 30740c7..b791aad 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -69,7 +69,6 @@ public class BlockManagerImpl implements EventHandler<Boolean>, // Currently only user of the block service is Ozone, CBlock manages blocks // by itself and does not rely on the Block service offered by SCM. - private final NodeManager nodeManager; private final ContainerManager containerManager; private final long containerSize; @@ -95,7 +94,6 @@ public class BlockManagerImpl implements EventHandler<Boolean>, final NodeManager nodeManager, final ContainerManager containerManager, EventPublisher eventPublisher) throws IOException { - this.nodeManager = nodeManager; this.containerManager = containerManager; this.containerSize = (long)conf.getStorageSize( @@ -226,8 +224,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, // USER of the containers. So there might be cases where a different // USER has few containers in ALLOCATED state, which will result in // false positive. - if (!containerManager.getStateManager().getContainerStateMap() - .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED) + if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED) .isEmpty()) { // Since the above check can result in false positive, we have to do // the actual check and find out if there are containers in ALLOCATED @@ -242,7 +239,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, HddsProtos.LifeCycleState.ALLOCATED); if (containerWithPipeline != null) { containerManager.updateContainerState( - containerWithPipeline.getContainerInfo().getContainerID(), + containerWithPipeline.getContainerInfo().containerID(), HddsProtos.LifeCycleEvent.CREATE); return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.ALLOCATED); @@ -268,8 +265,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, // state, we have to check again as we only hold a read lock. // Some other thread might have pre-allocated container in meantime. synchronized (this) { - if (!containerManager.getStateManager().getContainerStateMap() - .getContainerIDsByState(HddsProtos.LifeCycleState.ALLOCATED) + if (!containerManager.getContainers(HddsProtos.LifeCycleState.ALLOCATED) .isEmpty()) { containerWithPipeline = containerManager .getMatchingContainerWithPipeline(size, owner, type, factor, @@ -285,7 +281,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, if (containerWithPipeline != null) { containerManager.updateContainerState( - containerWithPipeline.getContainerInfo().getContainerID(), + containerWithPipeline.getContainerInfo().containerID(), HddsProtos.LifeCycleEvent.CREATE); return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.ALLOCATED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index c86f9cd..5c112a0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.block; import com.google.common.collect.ArrayListMultimap; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto @@ -60,8 +61,9 @@ public class DatanodeDeletedBlockTransactions { Pipeline pipeline = null; try { ContainerWithPipeline containerWithPipeline = - containerManager.getContainerWithPipeline(tx.getContainerID()); - if (containerWithPipeline.getContainerInfo().isContainerOpen() + containerManager.getContainerWithPipeline( + ContainerID.valueof(tx.getContainerID())); + if (containerWithPipeline.getContainerInfo().isOpen() || containerWithPipeline.getPipeline().isEmpty()) { return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 5d3afd5..51790be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.scm.command .CommandStatusReportHandler.DeleteBlockStatus; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -258,8 +259,8 @@ public class DeletedBlockLogImpl dnsWithCommittedTxn.add(dnID); Pipeline pipeline = - containerManager.getContainerWithPipeline(containerId) - .getPipeline(); + containerManager.getContainerWithPipeline( + ContainerID.valueof(containerId)).getPipeline(); Collection<DatanodeDetails> containerDnsDetails = pipeline.getDatanodes().values(); // The delete entry can be safely removed from the log if all the http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 8be7803..74edbc2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -61,7 +60,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { ContainerInfo info; try { containerWithPipeline = - containerManager.getContainerWithPipeline(containerID.getId()); + containerManager.getContainerWithPipeline(containerID); info = containerWithPipeline.getContainerInfo(); if (info == null) { LOG.error("Failed to update the container state. Container with id : {}" @@ -81,8 +80,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { // We cannot close a container in ALLOCATED state, moving the // container to CREATING state, this should eventually // timeout and the container will be moved to DELETING state. - LOG.debug("Closing container {} in {} state", containerID, state); - containerManager.updateContainerState(containerID.getId(), + LOG.debug("Closing container #{} in {} state", containerID, state); + containerManager.updateContainerState(containerID, HddsProtos.LifeCycleEvent.CREATE); break; case CREATING: @@ -91,7 +90,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { LOG.debug("Closing container {} in {} state", containerID, state); break; case OPEN: - containerManager.updateContainerState(containerID.getId(), + containerManager.updateContainerState(containerID, HddsProtos.LifeCycleEvent.FINALIZE); fireCloseContainerEvents(containerWithPipeline, info, publisher); break; @@ -101,16 +100,15 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { case CLOSED: case DELETING: case DELETED: - LOG.info( - "container with id : {} is in {} state and need not be closed.", - containerID.getId(), info.getState()); + LOG.info("Cannot close container #{}, it is already in {} state.", + containerID.getId(), state); break; default: - throw new IOException( - "Invalid container state for container " + containerID); + throw new IOException("Invalid container state for container #" + + containerID); } } catch (IOException ex) { - LOG.error("Failed to update the container state for" + "container : {}" + LOG.error("Failed to update the container state for container #{}" + containerID, ex); } } @@ -125,13 +123,14 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { info.getReplicationType(), info.getPipelineID()); Pipeline pipeline = containerWithPipeline.getPipeline(); - pipeline.getMachines().stream().map( - datanode -> new CommandForDatanode<>(datanode.getUuid(), - closeContainerCommand)).forEach((command) -> { - publisher.fireEvent(DATANODE_COMMAND, command); - }); + pipeline.getMachines().stream() + .map(node -> + new CommandForDatanode<>(node.getUuid(), closeContainerCommand)) + .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command)); + publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new CloseContainerRetryableReq(containerID)); + LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand, pipeline, containerID); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java index 7b94bd2..4593c1f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java @@ -88,7 +88,8 @@ public class CloseContainerWatcher extends publisher) { try { // Check if container is still open - if (containerManager.getContainer(containerID).isContainerOpen()) { + if (containerManager.getContainer( + ContainerID.valueof(containerID)).isOpen()) { publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, ContainerID.valueof(containerID)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index e586f3e..5dba8fd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -31,13 +30,31 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +// TODO: Write extensive java doc. +// This is the main interface of ContainerManager. /** * ContainerManager class contains the mapping from a name to a pipeline * mapping. This is used by SCM when allocating new locations and when * looking up a key. */ public interface ContainerManager extends Closeable { + + /** + * Returns all the containers managed by ContainerManager. + * + * @return List of ContainerInfo + */ + List<ContainerInfo> getContainers(); + + /** + * Returns all the containers which are in the specified state. + * + * @return List of ContainerInfo + */ + List<ContainerInfo> getContainers(HddsProtos.LifeCycleState state); + /** * Returns the ContainerInfo from the container ID. * @@ -45,7 +62,8 @@ public interface ContainerManager extends Closeable { * @return - ContainerInfo such as creation state and the pipeline. * @throws IOException */ - ContainerInfo getContainer(long containerID) throws IOException; + ContainerInfo getContainer(ContainerID containerID) + throws ContainerNotFoundException; /** * Returns the ContainerInfo from the container ID. @@ -54,8 +72,8 @@ public interface ContainerManager extends Closeable { * @return - ContainerWithPipeline such as creation state and the pipeline. * @throws IOException */ - ContainerWithPipeline getContainerWithPipeline(long containerID) - throws IOException; + ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) + throws ContainerNotFoundException; /** * Returns containers under certain conditions. @@ -72,8 +90,7 @@ public interface ContainerManager extends Closeable { * @return a list of container. * @throws IOException */ - List<ContainerInfo> listContainer(long startContainerID, int count) - throws IOException; + List<ContainerInfo> listContainer(ContainerID startContainerID, int count); /** * Allocates a new container for a given keyName and replication factor. @@ -93,7 +110,7 @@ public interface ContainerManager extends Closeable { * @param containerID - Container ID * @throws IOException */ - void deleteContainer(long containerID) throws IOException; + void deleteContainer(ContainerID containerID) throws IOException; /** * Update container state. @@ -102,23 +119,44 @@ public interface ContainerManager extends Closeable { * @return - new container state * @throws IOException */ - HddsProtos.LifeCycleState updateContainerState(long containerID, + HddsProtos.LifeCycleState updateContainerState(ContainerID containerID, HddsProtos.LifeCycleEvent event) throws IOException; /** - * Returns the container State Manager. - * @return ContainerStateManager - */ - ContainerStateManager getStateManager(); - - /** * Process container report from Datanode. * * @param reports Container report */ void processContainerReports(DatanodeDetails datanodeDetails, - ContainerReportsProto reports, boolean isRegisterCall) - throws IOException; + ContainerReportsProto reports) throws IOException; + + /** + * Returns the latest list of replicas for given containerId. + * + * @param containerID Container ID + * @return Set of ContainerReplica + */ + Set<ContainerReplica> getContainerReplicas(ContainerID containerID) + throws ContainerNotFoundException; + + /** + * Adds a container Replica for the given Container. + * + * @param containerID Container ID + * @param replica ContainerReplica + */ + void updateContainerReplica(ContainerID containerID, ContainerReplica replica) + throws ContainerNotFoundException; + + /** + * Remove a container Replica form a given Container. + * + * @param containerID Container ID + * @param replica ContainerReplica + * @return True of dataNode is removed successfully else false. + */ + void removeContainerReplica(ContainerID containerID, ContainerReplica replica) + throws ContainerNotFoundException, ContainerReplicaNotFoundException; /** * Update deleteTransactionId according to deleteTransactionMap. http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java new file mode 100644 index 0000000..f2e80f4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.CompareToBuilder; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.util.Optional; +import java.util.UUID; + +/** + * In-memory state of a container replica. + */ +public final class ContainerReplica implements Comparable<ContainerReplica> { + + final private ContainerID containerID; + final private DatanodeDetails datanodeDetails; + final private UUID placeOfBirth; + + private Long sequenceId; + + + private ContainerReplica(ContainerID containerID, DatanodeDetails datanode, + UUID originNodeId) { + this.containerID = containerID; + this.datanodeDetails = datanode; + this.placeOfBirth = originNodeId; + } + + private void setSequenceId(Long seqId) { + sequenceId = seqId; + } + + /** + * Returns the DatanodeDetails to which this replica belongs. + * + * @return DatanodeDetails + */ + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + + /** + * Returns the UUID of Datanode where this replica originated. + * + * @return UUID + */ + public UUID getOriginDatanodeId() { + return placeOfBirth; + } + + /** + * Returns the Sequence Id of this replica. + * + * @return Sequence Id + */ + public Long getSequenceId() { + return sequenceId; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(61, 71) + .append(containerID) + .append(datanodeDetails) + .toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final ContainerReplica that = (ContainerReplica) o; + + return new EqualsBuilder() + .append(containerID, that.containerID) + .append(datanodeDetails, that.datanodeDetails) + .isEquals(); + } + + @Override + public int compareTo(ContainerReplica that) { + Preconditions.checkNotNull(that); + return new CompareToBuilder() + .append(this.containerID, that.containerID) + .append(this.datanodeDetails, that.datanodeDetails) + .build(); + } + + /** + * Returns a new Builder to construct ContainerReplica. + * + * @return ContainerReplicaBuilder + */ + public static ContainerReplicaBuilder newBuilder() { + return new ContainerReplicaBuilder(); + } + + /** + * Used for building ContainerReplica instance. + */ + public static class ContainerReplicaBuilder { + + private ContainerID containerID; + private DatanodeDetails datanode; + private UUID placeOfBirth; + private Long sequenceId; + + /** + * Set Container Id. + * + * @param containerId ContainerID + * @return ContainerReplicaBuilder + */ + public ContainerReplicaBuilder setContainerID( + final ContainerID containerId) { + containerID = containerId; + return this; + } + + /** + * Set DatanodeDetails. + * + * @param datanodeDetails DatanodeDetails + * @return ContainerReplicaBuilder + */ + public ContainerReplicaBuilder setDatanodeDetails( + DatanodeDetails datanodeDetails) { + datanode = datanodeDetails; + return this; + } + + /** + * Set replica origin node id. + * + * @param originNodeId origin node UUID + * @return ContainerReplicaBuilder + */ + public ContainerReplicaBuilder setOriginNodeId(UUID originNodeId) { + placeOfBirth = originNodeId; + return this; + } + + /** + * Set sequence Id of the replica. + * + * @param seqId container sequence Id + * @return ContainerReplicaBuilder + */ + public ContainerReplicaBuilder setSequenceId(long seqId) { + sequenceId = seqId; + return this; + } + + /** + * Constructs new ContainerReplicaBuilder. + * + * @return ContainerReplicaBuilder + */ + public ContainerReplica build() { + Preconditions.checkNotNull(containerID, + "Container Id can't be null"); + Preconditions.checkNotNull(datanode, + "DatanodeDetails can't be null"); + ContainerReplica replica = new ContainerReplica(containerID, datanode, + Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid())); + Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId); + return replica; + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0f824a0..5885d959 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -24,11 +24,9 @@ import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; @@ -49,11 +47,7 @@ public class ContainerReportHandler implements LoggerFactory.getLogger(ContainerReportHandler.class); private final NodeManager nodeManager; - private final ContainerManager containerManager; - - private ContainerStateManager containerStateManager; - private ReplicationActivityStatus replicationStatus; public ContainerReportHandler(ContainerManager containerManager, @@ -62,7 +56,6 @@ public class ContainerReportHandler implements Preconditions.checkNotNull(containerManager); Preconditions.checkNotNull(nodeManager); Preconditions.checkNotNull(replicationActivityStatus); - this.containerStateManager = containerManager.getStateManager(); this.nodeManager = nodeManager; this.containerManager = containerManager; this.replicationStatus = replicationActivityStatus; @@ -81,7 +74,7 @@ public class ContainerReportHandler implements //update state in container db and trigger close container events containerManager - .processContainerReports(datanodeOrigin, containerReport, false); + .processContainerReports(datanodeOrigin, containerReport); Set<ContainerID> containerIds = containerReport.getReportsList().stream() .map(StorageContainerDatanodeProtocolProtos @@ -97,13 +90,21 @@ public class ContainerReportHandler implements .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); for (ContainerID containerID : reportResult.getMissingEntries()) { - containerStateManager - .removeContainerReplica(containerID, datanodeOrigin); + final ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(containerID) + .setDatanodeDetails(datanodeOrigin) + .build(); + containerManager + .removeContainerReplica(containerID, replica); checkReplicationState(containerID, publisher); } for (ContainerID containerID : reportResult.getNewEntries()) { - containerStateManager.addContainerReplica(containerID, datanodeOrigin); + final ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(containerID) + .setDatanodeDetails(datanodeOrigin) + .build(); + containerManager.updateContainerReplica(containerID, replica); checkReplicationState(containerID, publisher); } @@ -116,35 +117,30 @@ public class ContainerReportHandler implements } private void checkReplicationState(ContainerID containerID, - EventPublisher publisher) - throws SCMException { - ContainerInfo container = containerStateManager.getContainer(containerID); - - if (container == null) { - //warning unknown container + EventPublisher publisher) { + try { + ContainerInfo container = containerManager.getContainer(containerID); + replicateIfNeeded(container, publisher); + } catch (ContainerNotFoundException ex) { LOG.warn( "Container is missing from containerStateManager. Can't request " + "replication. {}", containerID); - return; - } - if (container.isContainerOpen()) { - return; } - ReplicationRequest replicationState = - containerStateManager.checkReplicationState(containerID); - if (replicationState != null) { - if (replicationStatus.isReplicationEnabled()) { + } + + private void replicateIfNeeded(ContainerInfo container, + EventPublisher publisher) throws ContainerNotFoundException { + if (!container.isOpen() && replicationStatus.isReplicationEnabled()) { + final int existingReplicas = containerManager + .getContainerReplicas(container.containerID()).size(); + final int expectedReplicas = container.getReplicationFactor().getNumber(); + if (existingReplicas != expectedReplicas) { publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - replicationState); - } else { - LOG.warn( - "Over/under replicated container but the replication is not " - + "(yet) enabled: " - + replicationState.toString()); + new ReplicationRequest(container.getContainerID(), + existingReplicas, expectedReplicas)); } } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
