http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java new file mode 100644 index 0000000..a7ce60b --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; + +import javax.management.MBeanServer; +import java.io.File; +import java.lang.management.ManagementFactory; + +/** + * Test the JMX interface for the rocksdb metastore implementation. + */ +public class TestRocksDBStoreMBean { + + @Test + public void testJmxBeans() throws Exception { + File testDir = + GenericTestUtils.getTestDir(getClass().getSimpleName() + "-withstat"); + + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB); + + RocksDBStore metadataStore = + (RocksDBStore) MetadataStoreBuilder.newBuilder().setConf(conf) + .setCreateIfMissing(true).setDbFile(testDir).build(); + + for (int i = 0; i < 10; i++) { + metadataStore.put("key".getBytes(), "value".getBytes()); + } + + MBeanServer platformMBeanServer = + ManagementFactory.getPlatformMBeanServer(); + Thread.sleep(2000); + + Object keysWritten = platformMBeanServer + .getAttribute(metadataStore.getStatMBeanName(), "NUMBER_KEYS_WRITTEN"); + + Assert.assertEquals(10L, keysWritten); + + Object dbWriteAverage = platformMBeanServer + .getAttribute(metadataStore.getStatMBeanName(), "DB_WRITE_AVERAGE"); + Assert.assertTrue((double) dbWriteAverage > 0); + + metadataStore.close(); + + } + + @Test() + public void testDisabledStat() throws Exception { + File testDir = GenericTestUtils + .getTestDir(getClass().getSimpleName() + "-withoutstat"); + + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB); + conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS, + OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF); + + RocksDBStore metadataStore = + (RocksDBStore) MetadataStoreBuilder.newBuilder().setConf(conf) + .setCreateIfMissing(true).setDbFile(testDir).build(); + + Assert.assertNull(metadataStore.getStatMBeanName()); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml new file mode 100644 index 0000000..736272d --- /dev/null +++ b/hadoop-hdds/container-service/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 +http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds</artifactId> + <version>3.2.0-SNAPSHOT</version> + </parent> + <artifactId>hadoop-hdds-container-service</artifactId> + <version>3.2.0-SNAPSHOT</version> + <description>Apache HDDS Container server</description> + <name>Apache HDDS Container server</name> + <packaging>jar</packaging> + + <properties> + <hadoop.component>hdds</hadoop.component> + <is.hadoop.component>true</is.hadoop.component> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdds-server-framework</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.2.0</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param> + ${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ + </param> + <param> + ${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ + </param> + <param> + ${basedir}/../../hadoop-hdds/common/src/main/proto/ + </param> + <param>${basedir}/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>StorageContainerDatanodeProtocol.proto</include> + </includes> + </source> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java new file mode 100644 index 0000000..956aef2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.base.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdds.HddsUtils.*; +import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs; + +/** + * Hdds stateless helper functions for server side components. + */ +public class HddsServerUtil { + + private static final Logger LOG = LoggerFactory.getLogger( + HddsServerUtil.class); + + /** + * Retrieve the socket address that should be used by DataNodes to connect + * to the SCM. + * + * @param conf + * @return Target InetSocketAddress for the SCM service endpoint. + */ + public static InetSocketAddress getScmAddressForDataNodes( + Configuration conf) { + // We try the following settings in decreasing priority to retrieve the + // target host. + // - OZONE_SCM_DATANODE_ADDRESS_KEY + // - OZONE_SCM_CLIENT_ADDRESS_KEY + // + final Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); + + if (!host.isPresent()) { + throw new IllegalArgumentException( + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY + + " must be defined. See" + + " https://wiki.apache.org/hadoop/Ozone#Configuration " + + "for details on configuring Ozone."); + } + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); + + InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" + + port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + + return addr; + } + + /** + * Retrieve the socket address that should be used by clients to connect + * to the SCM. + * + * @param conf + * @return Target InetSocketAddress for the SCM client endpoint. + */ + public static InetSocketAddress getScmClientBindAddress( + Configuration conf) { + final Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY); + + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" + + port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT)); + } + + /** + * Retrieve the socket address that should be used by clients to connect + * to the SCM Block service. + * + * @param conf + * @return Target InetSocketAddress for the SCM block client endpoint. + */ + public static InetSocketAddress getScmBlockClientBindAddress( + Configuration conf) { + final Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY); + + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) + + ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT)); + } + + /** + * Retrieve the socket address that should be used by DataNodes to connect + * to the SCM. + * + * @param conf + * @return Target InetSocketAddress for the SCM service endpoint. + */ + public static InetSocketAddress getScmDataNodeBindAddress( + Configuration conf) { + final Optional<String> host = getHostNameFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY); + + // If no port number is specified then we'll just try the defaultBindPort. + final Optional<Integer> port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY); + + return NetUtils.createSocketAddr( + host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" + + port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT)); + } + + + /** + * Returns the interval in which the heartbeat processor thread runs. + * + * @param conf - Configuration + * @return long in Milliseconds. + */ + public static long getScmheartbeatCheckerInterval(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + } + + /** + * Heartbeat Interval - Defines the heartbeat frequency from a datanode to + * SCM. + * + * @param conf - Ozone Config + * @return - HB interval in seconds. + */ + public static long getScmHeartbeatInterval(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL, + ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT, + TimeUnit.SECONDS); + } + + /** + * Get the Stale Node interval, which is used by SCM to flag a datanode as + * stale, if the heartbeat from that node has been missing for this duration. + * + * @param conf - Configuration. + * @return - Long, Milliseconds to wait before flagging a node as stale. + */ + public static long getStaleNodeInterval(Configuration conf) { + + long staleNodeIntervalMs = + conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL, + OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + + long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf); + + long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000; + + + // Make sure that StaleNodeInterval is configured way above the frequency + // at which we run the heartbeat thread. + // + // Here we check that staleNodeInterval is at least five times more than the + // frequency at which the accounting thread is going to run. + try { + sanitizeUserArgs(staleNodeIntervalMs, heartbeatThreadFrequencyMs, + 5, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Stale Node Interval is cannot be honored due to " + + "mis-configured {}. ex: {}", + OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, ex); + throw ex; + } + + // Make sure that stale node value is greater than configured value that + // datanodes are going to send HBs. + try { + sanitizeUserArgs(staleNodeIntervalMs, heartbeatIntervalMs, 3, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Stale Node Interval MS is cannot be honored due to " + + "mis-configured {}. ex: {}", OZONE_SCM_HEARTBEAT_INTERVAL, ex); + throw ex; + } + return staleNodeIntervalMs; + } + + /** + * Gets the interval for dead node flagging. This has to be a value that is + * greater than stale node value, and by transitive relation we also know + * that this value is greater than heartbeat interval and heartbeatProcess + * Interval. + * + * @param conf - Configuration. + * @return - the interval for dead node flagging. + */ + public static long getDeadNodeInterval(Configuration conf) { + long staleNodeIntervalMs = getStaleNodeInterval(conf); + long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, + OZONE_SCM_DEADNODE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + try { + // Make sure that dead nodes Ms is at least twice the time for staleNodes + // with a max of 1000 times the staleNodes. + sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Dead Node Interval MS is cannot be honored due to " + + "mis-configured {}. ex: {}", OZONE_SCM_STALENODE_INTERVAL, ex); + throw ex; + } + return deadNodeIntervalMs; + } + + /** + * Returns the maximum number of heartbeat to process per loop of the process + * thread. + * @param conf Configuration + * @return - int -- Number of HBs to process + */ + public static int getMaxHBToProcessPerLoop(Configuration conf) { + return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, + ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); + } + + /** + * Timeout value for the RPC from Datanode to SCM, primarily used for + * Heartbeats and container reports. + * + * @param conf - Ozone Config + * @return - Rpc timeout in Milliseconds. + */ + public static long getScmRpcTimeOutInMilliseconds(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, + OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + } + + /** + * Log Warn interval. + * + * @param conf - Ozone Config + * @return - Log warn interval. + */ + public static int getLogWarnInterval(Configuration conf) { + return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, + OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); + } + + /** + * returns the Container port. + * @param conf - Conf + * @return port number. + */ + public static int getContainerPort(Configuration conf) { + return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + } + + + /** + * Return the list of service addresses for the Ozone SCM. This method is used + * by the DataNodes to determine the service instances to connect to. + * + * @param conf + * @return list of SCM service addresses. + */ + public static Map<String, ? extends Map<String, InetSocketAddress>> + getScmServiceRpcAddresses(Configuration conf) { + + final Map<String, InetSocketAddress> serviceInstances = new HashMap<>(); + serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID, + getScmAddressForDataNodes(conf)); + + final Map<String, Map<String, InetSocketAddress>> services = + new HashMap<>(); + services.put(OZONE_SCM_SERVICE_ID, serviceInstances); + return services; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java new file mode 100644 index 0000000..4e52046 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +/** + * This is a class that tracks versions of SCM. + */ +public final class VersionInfo { + + // We will just be normal and use positive counting numbers for versions. + private final static VersionInfo[] VERSION_INFOS = + {new VersionInfo("First version of SCM", 1)}; + + + public static final String DESCRIPTION_KEY = "Description"; + private final String description; + private final int version; + + /** + * Never created outside this class. + * + * @param description -- description + * @param version -- version number + */ + private VersionInfo(String description, int version) { + this.description = description; + this.version = version; + } + + /** + * Returns all versions. + * + * @return Version info array. + */ + public static VersionInfo[] getAllVersions() { + return VERSION_INFOS.clone(); + } + + /** + * Returns the latest version. + * + * @return versionInfo + */ + public static VersionInfo getLatestVersion() { + return VERSION_INFOS[VERSION_INFOS.length - 1]; + } + + /** + * Return description. + * + * @return String + */ + public String getDescription() { + return description; + } + + /** + * Return the version. + * + * @return int. + */ + public int getVersion() { + return version; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java new file mode 100644 index 0000000..7213e7e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeServicePlugin; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.UUID; + +/** + * Datanode service plugin to start the HDDS container services. + */ +public class HddsDatanodeService implements DataNodeServicePlugin { + + private static final Logger LOG = LoggerFactory.getLogger( + HddsDatanodeService.class); + + private final boolean isOzoneEnabled; + + private Configuration conf; + private DatanodeDetails datanodeDetails; + private DatanodeStateMachine datanodeStateMachine; + + public HddsDatanodeService() { + try { + OzoneConfiguration.activate(); + this.conf = new OzoneConfiguration(); + this.isOzoneEnabled = HddsUtils.isHddsEnabled(conf); + if (isOzoneEnabled) { + this.datanodeDetails = getDatanodeDetails(conf); + String hostname = DataNode.getHostName(conf); + String ip = InetAddress.getByName(hostname).getHostAddress(); + this.datanodeDetails.setHostName(hostname); + this.datanodeDetails.setIpAddress(ip); + } + } catch (IOException e) { + throw new RuntimeException("Can't start the HDDS datanode plugin", e); + } + } + + @Override + public void start(Object service) { + if (isOzoneEnabled) { + try { + DataNode dataNode = (DataNode) service; + datanodeDetails.setInfoPort(dataNode.getInfoPort()); + datanodeDetails.setInfoSecurePort(dataNode.getInfoSecurePort()); + datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf); + datanodeStateMachine.startDaemon(); + } catch (IOException e) { + throw new RuntimeException("Can't start the HDDS datanode plugin", e); + } + } + } + + /** + * Returns ContainerNodeIDProto or null in case of Error. + * + * @return ContainerNodeIDProto + */ + private static DatanodeDetails getDatanodeDetails(Configuration conf) + throws IOException { + String idFilePath = HddsUtils.getDatanodeIdFilePath(conf); + if (idFilePath == null || idFilePath.isEmpty()) { + LOG.error("A valid file path is needed for config setting {}", + ScmConfigKeys.OZONE_SCM_DATANODE_ID); + throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_DATANODE_ID + + " must be defined. See" + + " https://wiki.apache.org/hadoop/Ozone#Configuration" + + " for details on configuring Ozone."); + } + + Preconditions.checkNotNull(idFilePath); + File idFile = new File(idFilePath); + if (idFile.exists()) { + return ContainerUtils.readDatanodeDetailsFrom(idFile); + } else { + // There is no datanode.id file, this might be the first time datanode + // is started. + String datanodeUuid = UUID.randomUUID().toString(); + return DatanodeDetails.newBuilder().setUuid(datanodeUuid).build(); + } + } + + /** + * + * Return DatanodeDetails if set, return null otherwise. + * + * @return DatanodeDetails + */ + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + + @InterfaceAudience.Private + public DatanodeStateMachine getDatanodeStateMachine() { + return datanodeStateMachine; + } + + @Override + public void stop() { + if (datanodeStateMachine != null) { + datanodeStateMachine.stopDaemon(); + } + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java new file mode 100644 index 0000000..68bf442 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CHECKSUM_MISMATCH; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .CONTAINER_NOT_FOUND; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .INVALID_WRITE_SIZE; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .IO_EXCEPTION; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .OVERWRITE_FLAG_REQUIRED; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNABLE_TO_FIND_CHUNK; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNABLE_TO_FIND_DATA_DIR; + +/** + * Set of utility functions used by the chunk Manager. + */ +public final class ChunkUtils { + + /* Never constructed. */ + private ChunkUtils() { + } + + /** + * Checks if we are getting a request to overwrite an existing range of + * chunk. + * + * @param chunkFile - File + * @param chunkInfo - Buffer to write + * @return bool + */ + public static boolean isOverWriteRequested(File chunkFile, ChunkInfo + chunkInfo) { + + if (!chunkFile.exists()) { + return false; + } + + long offset = chunkInfo.getOffset(); + return offset < chunkFile.length(); + } + + /** + * Overwrite is permitted if an only if the user explicitly asks for it. We + * permit this iff the key/value pair contains a flag called + * [OverWriteRequested, true]. + * + * @param chunkInfo - Chunk info + * @return true if the user asks for it. + */ + public static boolean isOverWritePermitted(ChunkInfo chunkInfo) { + String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE); + return (overWrite != null) && + (!overWrite.isEmpty()) && + (Boolean.valueOf(overWrite)); + } + + /** + * Validates chunk data and returns a file object to Chunk File that we are + * expected to write data to. + * + * @param pipeline - pipeline. + * @param data - container data. + * @param info - chunk info. + * @return File + * @throws StorageContainerException + */ + public static File validateChunk(Pipeline pipeline, ContainerData data, + ChunkInfo info) throws StorageContainerException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + File chunkFile = getChunkFile(pipeline, data, info); + if (ChunkUtils.isOverWriteRequested(chunkFile, info)) { + if (!ChunkUtils.isOverWritePermitted(info)) { + log.error("Rejecting write chunk request. Chunk overwrite " + + "without explicit request. {}", info.toString()); + throw new StorageContainerException("Rejecting write chunk request. " + + "OverWrite flag required." + info.toString(), + OVERWRITE_FLAG_REQUIRED); + } + } + return chunkFile; + } + + /** + * Validates that Path to chunk file exists. + * + * @param pipeline - Container Info. + * @param data - Container Data + * @param info - Chunk info + * @return - File. + * @throws StorageContainerException + */ + public static File getChunkFile(Pipeline pipeline, ContainerData data, + ChunkInfo info) throws StorageContainerException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + if (data == null) { + log.error("Invalid container Name: {}", pipeline.getContainerName()); + throw new StorageContainerException("Unable to find the container Name:" + + " " + + pipeline.getContainerName(), CONTAINER_NOT_FOUND); + } + + File dataDir = ContainerUtils.getDataDirectory(data).toFile(); + if (!dataDir.exists()) { + log.error("Unable to find the data directory: {}", dataDir); + throw new StorageContainerException("Unable to find the data directory:" + + " " + dataDir, UNABLE_TO_FIND_DATA_DIR); + } + + return dataDir.toPath().resolve(info.getChunkName()).toFile(); + + } + + /** + * Writes the data in chunk Info to the specified location in the chunkfile. + * + * @param chunkFile - File to write data to. + * @param chunkInfo - Data stream to write. + * @param data - The data buffer. + * @throws StorageContainerException + */ + public static void writeData(File chunkFile, ChunkInfo chunkInfo, + byte[] data) throws + StorageContainerException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + if (data.length != chunkInfo.getLen()) { + String err = String.format("data array does not match the length " + + "specified. DataLen: %d Byte Array: %d", + chunkInfo.getLen(), data.length); + log.error(err); + throw new StorageContainerException(err, INVALID_WRITE_SIZE); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.SPARSE, + StandardOpenOption.SYNC); + lock = file.lock().get(); + if (chunkInfo.getChecksum() != null && + !chunkInfo.getChecksum().isEmpty()) { + verifyChecksum(chunkInfo, data, log); + } + int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); + if (size != data.length) { + log.error("Invalid write size found. Size:{} Expected: {} ", size, + data.length); + throw new StorageContainerException("Invalid write size found. " + + "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE); + } + } catch (IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + + } finally { + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.error("Unable to release lock ??, Fatal Error."); + throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR); + + } + } + if (file != null) { + try { + file.close(); + } catch (IOException e) { + throw new StorageContainerException("Error closing chunk file", + e, CONTAINER_INTERNAL_ERROR); + } + } + } + } + + /** + * Verifies the checksum of a chunk against the data buffer. + * + * @param chunkInfo - Chunk Info. + * @param data - data buffer + * @param log - log + * @throws NoSuchAlgorithmException + * @throws StorageContainerException + */ + private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger + log) throws NoSuchAlgorithmException, StorageContainerException { + MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + sha.update(data); + if (!Hex.encodeHexString(sha.digest()).equals( + chunkInfo.getChecksum())) { + log.error("Checksum mismatch. Provided: {} , computed: {}", + chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest())); + throw new StorageContainerException("Checksum mismatch. Provided: " + + chunkInfo.getChecksum() + " , computed: " + + DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH); + } + } + + /** + * Reads data from an existing chunk file. + * + * @param chunkFile - file where data lives. + * @param data - chunk definition. + * @return ByteBuffer + * @throws StorageContainerException + * @throws ExecutionException + * @throws InterruptedException + */ + public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws + StorageContainerException, ExecutionException, InterruptedException, + NoSuchAlgorithmException { + Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); + + if (!chunkFile.exists()) { + log.error("Unable to find the chunk file. chunk info : {}", + data.toString()); + throw new StorageContainerException("Unable to find the chunk file. " + + "chunk info " + + data.toString(), UNABLE_TO_FIND_CHUNK); + } + + AsynchronousFileChannel file = null; + FileLock lock = null; + try { + file = + AsynchronousFileChannel.open(chunkFile.toPath(), + StandardOpenOption.READ); + lock = file.lock(data.getOffset(), data.getLen(), true).get(); + + ByteBuffer buf = ByteBuffer.allocate((int) data.getLen()); + file.read(buf, data.getOffset()).get(); + + if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { + verifyChecksum(data, buf.array(), log); + } + + return buf; + } catch (IOException e) { + throw new StorageContainerException(e, IO_EXCEPTION); + } finally { + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + log.error("I/O error is lock release."); + } + } + if (file != null) { + IOUtils.closeStream(file); + } + } + } + + /** + * Returns a CreateContainer Response. This call is used by create and delete + * containers which have null success responses. + * + * @param msg Request + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) { + return ContainerUtils.getContainerResponse(msg); + } + + /** + * Gets a response to the read chunk calls. + * + * @param msg - Msg + * @param data - Data + * @param info - Info + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg, + byte[] data, ChunkInfo info) { + Preconditions.checkNotNull(msg); + + ContainerProtos.ReadChunkResponseProto.Builder response = + ContainerProtos.ReadChunkResponseProto.newBuilder(); + response.setChunkData(info.getProtoBufMessage()); + response.setData(ByteString.copyFrom(data)); + response.setPipeline(msg.getReadChunk().getPipeline()); + + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setReadChunk(response); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java new file mode 100644 index 0000000..c29374c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.Time; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class maintains the information about a container in the ozone world. + * <p> + * A container is a name, along with metadata- which is a set of key value + * pair. + */ +public class ContainerData { + + private final String containerName; + private final Map<String, String> metadata; + private String dbPath; // Path to Level DB Store. + // Path to Physical file system where container and checksum are stored. + private String containerFilePath; + private String hash; + private AtomicLong bytesUsed; + private long maxSize; + private Long containerID; + private HddsProtos.LifeCycleState state; + + /** + * Constructs a ContainerData Object. + * + * @param containerName - Name + */ + public ContainerData(String containerName, Long containerID, + Configuration conf) { + this.metadata = new TreeMap<>(); + this.containerName = containerName; + this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, + ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB; + this.bytesUsed = new AtomicLong(0L); + this.containerID = containerID; + this.state = HddsProtos.LifeCycleState.OPEN; + } + + /** + * Constructs a ContainerData object from ProtoBuf classes. + * + * @param protoData - ProtoBuf Message + * @throws IOException + */ + public static ContainerData getFromProtBuf( + ContainerProtos.ContainerData protoData, Configuration conf) + throws IOException { + ContainerData data = new ContainerData(protoData.getName(), + protoData.getContainerID(), conf); + for (int x = 0; x < protoData.getMetadataCount(); x++) { + data.addMetadata(protoData.getMetadata(x).getKey(), + protoData.getMetadata(x).getValue()); + } + + if (protoData.hasContainerPath()) { + data.setContainerPath(protoData.getContainerPath()); + } + + if (protoData.hasDbPath()) { + data.setDBPath(protoData.getDbPath()); + } + + if (protoData.hasState()) { + data.setState(protoData.getState()); + } + + if(protoData.hasHash()) { + data.setHash(protoData.getHash()); + } + + if (protoData.hasBytesUsed()) { + data.setBytesUsed(protoData.getBytesUsed()); + } + + if (protoData.hasSize()) { + data.setMaxSize(protoData.getSize()); + } + return data; + } + + /** + * Returns a ProtoBuf Message from ContainerData. + * + * @return Protocol Buffer Message + */ + public ContainerProtos.ContainerData getProtoBufMessage() { + ContainerProtos.ContainerData.Builder builder = ContainerProtos + .ContainerData.newBuilder(); + builder.setName(this.getContainerName()); + builder.setContainerID(this.getContainerID()); + + if (this.getDBPath() != null) { + builder.setDbPath(this.getDBPath()); + } + + if (this.getHash() != null) { + builder.setHash(this.getHash()); + } + + if (this.getContainerPath() != null) { + builder.setContainerPath(this.getContainerPath()); + } + + builder.setState(this.getState()); + + for (Map.Entry<String, String> entry : metadata.entrySet()) { + HddsProtos.KeyValue.Builder keyValBuilder = + HddsProtos.KeyValue.newBuilder(); + builder.addMetadata(keyValBuilder.setKey(entry.getKey()) + .setValue(entry.getValue()).build()); + } + + if (this.getBytesUsed() >= 0) { + builder.setBytesUsed(this.getBytesUsed()); + } + + if (this.getKeyCount() >= 0) { + builder.setKeyCount(this.getKeyCount()); + } + + if (this.getMaxSize() >= 0) { + builder.setSize(this.getMaxSize()); + } + + return builder.build(); + } + + /** + * Returns the name of the container. + * + * @return - name + */ + public String getContainerName() { + return containerName; + } + + /** + * Adds metadata. + */ + public void addMetadata(String key, String value) throws IOException { + synchronized (this.metadata) { + if (this.metadata.containsKey(key)) { + throw new IOException("This key already exists. Key " + key); + } + metadata.put(key, value); + } + } + + /** + * Returns all metadata. + */ + public Map<String, String> getAllMetadata() { + synchronized (this.metadata) { + return Collections.unmodifiableMap(this.metadata); + } + } + + /** + * Returns value of a key. + */ + public String getValue(String key) { + synchronized (this.metadata) { + return metadata.get(key); + } + } + + /** + * Deletes a metadata entry from the map. + * + * @param key - Key + */ + public void deleteKey(String key) { + synchronized (this.metadata) { + metadata.remove(key); + } + } + + /** + * Returns path. + * + * @return - path + */ + public String getDBPath() { + return dbPath; + } + + /** + * Sets path. + * + * @param path - String. + */ + public void setDBPath(String path) { + this.dbPath = path; + } + + /** + * This function serves as the generic key for ContainerCache class. Both + * ContainerData and ContainerKeyData overrides this function to appropriately + * return the right name that can be used in ContainerCache. + * + * @return String Name. + */ + public String getName() { + return getContainerName(); + } + + /** + * Get container file path. + * @return - Physical path where container file and checksum is stored. + */ + public String getContainerPath() { + return containerFilePath; + } + + /** + * Set container Path. + * @param containerPath - File path. + */ + public void setContainerPath(String containerPath) { + this.containerFilePath = containerPath; + } + + /** + * Get container ID. + * @return - container ID. + */ + public synchronized Long getContainerID() { + return containerID; + } + + public synchronized void setState(HddsProtos.LifeCycleState state) { + this.state = state; + } + + public synchronized HddsProtos.LifeCycleState getState() { + return this.state; + } + + /** + * checks if the container is open. + * @return - boolean + */ + public synchronized boolean isOpen() { + return HddsProtos.LifeCycleState.OPEN == state; + } + + /** + * Marks this container as closed. + */ + public synchronized void closeContainer() { + // TODO: closed or closing here + setState(HddsProtos.LifeCycleState.CLOSED); + + // Some thing brain dead for now. name + Time stamp of when we get the close + // container message. + setHash(DigestUtils.sha256Hex(this.getContainerName() + + Long.toString(Time.monotonicNow()))); + } + + /** + * Final hash for this container. + * @return - Hash + */ + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + + public void setMaxSize(long maxSize) { + this.maxSize = maxSize; + } + + public long getMaxSize() { + return maxSize; + } + + public long getKeyCount() { + return metadata.size(); + } + + public void setBytesUsed(long used) { + this.bytesUsed.set(used); + } + + public long addBytesUsed(long delta) { + return this.bytesUsed.addAndGet(delta); + } + + public long getBytesUsed() { + return bytesUsed.get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java new file mode 100644 index 0000000..d4d732b --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * + * This class is for maintaining the various Storage Container + * DataNode statistics and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + * <p> + * This class has a number of metrics variables that are publicly accessible; + * these variables (objects) have methods to update their values; + * for example: + * <p> {@link #numOps}.inc() + * + */ [email protected] +@Metrics(about="Storage Container DataNode Metrics", context="dfs") +public class ContainerMetrics { + @Metric private MutableCounterLong numOps; + private MutableCounterLong[] numOpsArray; + private MutableCounterLong[] opsBytesArray; + private MutableRate[] opsLatency; + private MutableQuantiles[][] opsLatQuantiles; + private MetricsRegistry registry = null; + + public ContainerMetrics(int[] intervals) { + int numEnumEntries = ContainerProtos.Type.values().length; + final int len = intervals.length; + this.numOpsArray = new MutableCounterLong[numEnumEntries]; + this.opsBytesArray = new MutableCounterLong[numEnumEntries]; + this.opsLatency = new MutableRate[numEnumEntries]; + this.opsLatQuantiles = new MutableQuantiles[numEnumEntries][len]; + this.registry = new MetricsRegistry("StorageContainerMetrics"); + for (int i = 0; i < numEnumEntries; i++) { + numOpsArray[i] = registry.newCounter( + "num" + ContainerProtos.Type.valueOf(i + 1), + "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops", + (long) 0); + opsBytesArray[i] = registry.newCounter( + "bytes" + ContainerProtos.Type.valueOf(i + 1), + "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op", + (long) 0); + opsLatency[i] = registry.newRate( + "latency" + ContainerProtos.Type.valueOf(i + 1), + ContainerProtos.Type.valueOf(i + 1) + " op"); + + for (int j = 0; j < len; j++) { + int interval = intervals[j]; + String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos" + + interval + "s"; + opsLatQuantiles[i][j] = registry.newQuantiles(quantileName, + "latency of Container ops", "ops", "latency", interval); + } + } + } + + public static ContainerMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + // Percentile measurement is off by default, by watching no intervals + int[] intervals = + conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY); + return ms.register("StorageContainerMetrics", + "Storage Container Node Metrics", + new ContainerMetrics(intervals)); + } + + public void incContainerOpcMetrics(ContainerProtos.Type type){ + numOps.incr(); + numOpsArray[type.ordinal()].incr(); + } + + public long getContainerOpsMetrics(ContainerProtos.Type type){ + return numOpsArray[type.ordinal()].value(); + } + + public void incContainerOpsLatencies(ContainerProtos.Type type, + long latencyNanos) { + opsLatency[type.ordinal()].add(latencyNanos); + for (MutableQuantiles q: opsLatQuantiles[type.ordinal()]) { + q.add(latencyNanos); + } + } + + public void incContainerBytesStats(ContainerProtos.Type type, long bytes) { + opsBytesArray[type.ordinal()].incr(bytes); + } + + public long getContainerBytesMetrics(ContainerProtos.Type type){ + return opsBytesArray[type.ordinal()].value(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java new file mode 100644 index 0000000..50d2da3 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo; + +/** + * Container Report iterates the closed containers and sends a container report + * to SCM. + */ +public class ContainerReport { + private static final int UNKNOWN = -1; + private final String containerName; + private final String finalhash; + private long size; + private long keyCount; + private long bytesUsed; + private long readCount; + private long writeCount; + private long readBytes; + private long writeBytes; + private long containerID; + + public long getContainerID() { + return containerID; + } + + public void setContainerID(long containerID) { + this.containerID = containerID; + } + + + + + /** + * Constructs the ContainerReport. + * + * @param containerName - Container Name. + * @param finalhash - Final Hash. + */ + public ContainerReport(String containerName, String finalhash) { + this.containerName = containerName; + this.finalhash = finalhash; + this.size = UNKNOWN; + this.keyCount = UNKNOWN; + this.bytesUsed = 0L; + this.readCount = 0L; + this.readBytes = 0L; + this.writeCount = 0L; + this.writeBytes = 0L; + } + + /** + * Gets a containerReport from protobuf class. + * + * @param info - ContainerInfo. + * @return - ContainerReport. + */ + public static ContainerReport getFromProtoBuf(ContainerInfo info) { + Preconditions.checkNotNull(info); + ContainerReport report = new ContainerReport(info.getContainerName(), + info.getFinalhash()); + if (info.hasSize()) { + report.setSize(info.getSize()); + } + if (info.hasKeyCount()) { + report.setKeyCount(info.getKeyCount()); + } + if (info.hasUsed()) { + report.setBytesUsed(info.getUsed()); + } + if (info.hasReadCount()) { + report.setReadCount(info.getReadCount()); + } + if (info.hasReadBytes()) { + report.setReadBytes(info.getReadBytes()); + } + if (info.hasWriteCount()) { + report.setWriteCount(info.getWriteCount()); + } + if (info.hasWriteBytes()) { + report.setWriteBytes(info.getWriteBytes()); + } + + report.setContainerID(info.getContainerID()); + return report; + } + + /** + * Gets the container name. + * + * @return - Name + */ + public String getContainerName() { + return containerName; + } + + /** + * Returns the final signature for this container. + * + * @return - hash + */ + public String getFinalhash() { + return finalhash; + } + + /** + * Returns a positive number it is a valid number, -1 if not known. + * + * @return size or -1 + */ + public long getSize() { + return size; + } + + /** + * Sets the size of the container on disk. + * + * @param size - int + */ + public void setSize(long size) { + this.size = size; + } + + /** + * Gets number of keys in the container if known. + * + * @return - Number of keys or -1 for not known. + */ + public long getKeyCount() { + return keyCount; + } + + /** + * Sets the key count. + * + * @param keyCount - Key Count + */ + public void setKeyCount(long keyCount) { + this.keyCount = keyCount; + } + + public long getReadCount() { + return readCount; + } + + public void setReadCount(long readCount) { + this.readCount = readCount; + } + + public long getWriteCount() { + return writeCount; + } + + public void setWriteCount(long writeCount) { + this.writeCount = writeCount; + } + + public long getReadBytes() { + return readBytes; + } + + public void setReadBytes(long readBytes) { + this.readBytes = readBytes; + } + + public long getWriteBytes() { + return writeBytes; + } + + public void setWriteBytes(long writeBytes) { + this.writeBytes = writeBytes; + } + + public long getBytesUsed() { + return bytesUsed; + } + + public void setBytesUsed(long bytesUsed) { + this.bytesUsed = bytesUsed; + } + + /** + * Gets a containerInfo protobuf message from ContainerReports. + * + * @return ContainerInfo + */ + public ContainerInfo getProtoBufMessage() { + return ContainerInfo.newBuilder() + .setContainerName(this.getContainerName()) + .setKeyCount(this.getKeyCount()) + .setSize(this.getSize()) + .setUsed(this.getBytesUsed()) + .setReadCount(this.getReadCount()) + .setReadBytes(this.getReadBytes()) + .setWriteCount(this.getWriteCount()) + .setWriteBytes(this.getWriteBytes()) + .setFinalhash(this.getFinalhash()) + .setContainerID(this.getContainerID()) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java new file mode 100644 index 0000000..1818188 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.scm.container.common.helpers + .StorageContainerException; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.apache.commons.io.FilenameUtils.removeExtension; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .INVALID_ARGUMENT; +import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result + .UNABLE_TO_FIND_DATA_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION; +import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META; + +/** + * A set of helper functions to create proper responses. + */ +public final class ContainerUtils { + + private ContainerUtils() { + //never constructed. + } + + /** + * Returns a CreateContainer Response. This call is used by create and delete + * containers which have null success responses. + * + * @param msg Request + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) { + ContainerProtos.ContainerCommandResponseProto.Builder builder = + getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); + return builder.build(); + } + + /** + * Returns a ReadContainer Response. + * + * @param msg Request + * @param containerData - data + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, + ContainerData containerData) { + Preconditions.checkNotNull(containerData); + + ContainerProtos.ReadContainerResponseProto.Builder response = + ContainerProtos.ReadContainerResponseProto.newBuilder(); + response.setContainerData(containerData.getProtoBufMessage()); + + ContainerProtos.ContainerCommandResponseProto.Builder builder = + getContainerResponse(msg, ContainerProtos.Result.SUCCESS, ""); + builder.setReadContainer(response); + return builder.build(); + } + + /** + * We found a command type but no associated payload for the command. Hence + * return malformed Command as response. + * + * @param msg - Protobuf message. + * @param result - result + * @param message - Error message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto.Builder + getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg, + ContainerProtos.Result result, String message) { + return + ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(msg.getCmdType()) + .setTraceID(msg.getTraceID()) + .setResult(result) + .setMessage(message); + } + + /** + * Logs the error and returns a response to the caller. + * + * @param log - Logger + * @param ex - Exception + * @param msg - Request Object + * @return Response + */ + public static ContainerProtos.ContainerCommandResponseProto logAndReturnError( + Logger log, StorageContainerException ex, + ContainerProtos.ContainerCommandRequestProto msg) { + log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}", + msg.getCmdType().name(), msg.getTraceID(), + ex.getMessage(), ex.getResult().getValueDescriptor().getName()); + return getContainerResponse(msg, ex.getResult(), ex.getMessage()).build(); + } + + /** + * Logs the error and returns a response to the caller. + * + * @param log - Logger + * @param ex - Exception + * @param msg - Request Object + * @return Response + */ + public static ContainerProtos.ContainerCommandResponseProto logAndReturnError( + Logger log, RuntimeException ex, + ContainerProtos.ContainerCommandRequestProto msg) { + log.info("Operation: {} : Trace ID: {} : Message: {} ", + msg.getCmdType().name(), msg.getTraceID(), ex.getMessage()); + return getContainerResponse(msg, INVALID_ARGUMENT, ex.getMessage()).build(); + } + + /** + * We found a command type but no associated payload for the command. Hence + * return malformed Command as response. + * + * @param msg - Protobuf message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto + malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) { + return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST, + "Cmd type does not match the payload.").build(); + } + + /** + * We found a command type that is not supported yet. + * + * @param msg - Protobuf message. + * @return ContainerCommandResponseProto - MALFORMED_REQUEST. + */ + public static ContainerProtos.ContainerCommandResponseProto + unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) { + return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST, + "Server does not support this command yet.").build(); + } + + /** + * get containerName from a container file. + * + * @param containerFile - File + * @return Name of the container. + */ + public static String getContainerNameFromFile(File containerFile) { + Preconditions.checkNotNull(containerFile); + return Paths.get(containerFile.getParent()).resolve( + removeExtension(containerFile.getName())).toString(); + } + + /** + * Verifies that this in indeed a new container. + * + * @param containerFile - Container File to verify + * @param metadataFile - metadata File to verify + * @throws IOException + */ + public static void verifyIsNewContainer(File containerFile, File metadataFile) + throws IOException { + Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class); + if (containerFile.exists()) { + log.error("container already exists on disk. File: {}", + containerFile.toPath()); + throw new FileAlreadyExistsException("container already exists on " + + "disk."); + } + + if (metadataFile.exists()) { + log.error("metadata found on disk, but missing container. Refusing to" + + " write this container. File: {} ", metadataFile.toPath()); + throw new FileAlreadyExistsException(("metadata found on disk, but " + + "missing container. Refusing to write this container.")); + } + + File parentPath = new File(containerFile.getParent()); + + if (!parentPath.exists() && !parentPath.mkdirs()) { + log.error("Unable to create parent path. Path: {}", + parentPath.toString()); + throw new IOException("Unable to create container directory."); + } + + if (!containerFile.createNewFile()) { + log.error("creation of a new container file failed. File: {}", + containerFile.toPath()); + throw new IOException("creation of a new container file failed."); + } + + if (!metadataFile.createNewFile()) { + log.error("creation of the metadata file failed. File: {}", + metadataFile.toPath()); + throw new IOException("creation of a new container file failed."); + } + } + + public static String getContainerDbFileName(String containerName) { + return containerName + OzoneConsts.DN_CONTAINER_DB; + } + + /** + * creates a Metadata DB for the specified container. + * + * @param containerPath - Container Path. + * @throws IOException + */ + public static Path createMetadata(Path containerPath, String containerName, + Configuration conf) + throws IOException { + Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class); + Preconditions.checkNotNull(containerPath); + Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH); + if (!metadataPath.toFile().mkdirs()) { + log.error("Unable to create directory for metadata storage. Path: {}", + metadataPath); + throw new IOException("Unable to create directory for metadata storage." + + " Path: " + metadataPath); + } + MetadataStore store = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setCreateIfMissing(true) + .setDbFile(metadataPath + .resolve(getContainerDbFileName(containerName)).toFile()) + .build(); + + // we close since the SCM pre-creates containers. + // we will open and put Db handle into a cache when keys are being created + // in a container. + + store.close(); + + Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH); + if (!dataPath.toFile().mkdirs()) { + + // If we failed to create data directory, we cleanup the + // metadata directory completely. That is, we will delete the + // whole directory including LevelDB file. + log.error("Unable to create directory for data storage. cleaning up the" + + " container path: {} dataPath: {}", + containerPath, dataPath); + FileUtils.deleteDirectory(containerPath.toFile()); + throw new IOException("Unable to create directory for data storage." + + " Path: " + dataPath); + } + return metadataPath; + } + + /** + * Returns Metadata location. + * + * @param containerData - Data + * @param location - Path + * @return Path + */ + public static File getMetadataFile(ContainerData containerData, + Path location) { + return location.resolve(containerData + .getContainerName().concat(CONTAINER_META)) + .toFile(); + } + + /** + * Returns container file location. + * + * @param containerData - Data + * @param location - Root path + * @return Path + */ + public static File getContainerFile(ContainerData containerData, + Path location) { + return location.resolve(containerData + .getContainerName().concat(CONTAINER_EXTENSION)) + .toFile(); + } + + /** + * Container metadata directory -- here is where the level DB lives. + * + * @param cData - cData. + * @return Path to the parent directory where the DB lives. + */ + public static Path getMetadataDirectory(ContainerData cData) { + Path dbPath = Paths.get(cData.getDBPath()); + Preconditions.checkNotNull(dbPath); + Preconditions.checkState(dbPath.toString().length() > 0); + return dbPath.getParent(); + } + + /** + * Returns the path where data or chunks live for a given container. + * + * @param cData - cData container + * @return - Path + * @throws StorageContainerException + */ + public static Path getDataDirectory(ContainerData cData) + throws StorageContainerException { + Path path = getMetadataDirectory(cData); + Preconditions.checkNotNull(path); + Path parentPath = path.getParent(); + if (parentPath == null) { + throw new StorageContainerException("Unable to get Data directory." + + path, UNABLE_TO_FIND_DATA_DIR); + } + return parentPath.resolve(OzoneConsts.CONTAINER_DATA_PATH); + } + + /** + * remove Container if it is empty. + * <p/> + * There are three things we need to delete. + * <p/> + * 1. Container file and metadata file. 2. The Level DB file 3. The path that + * we created on the data location. + * + * @param containerData - Data of the container to remove. + * @param conf - configuration of the cluster. + * @param forceDelete - whether this container should be deleted forcibly. + * @throws IOException + */ + public static void removeContainer(ContainerData containerData, + Configuration conf, boolean forceDelete) throws IOException { + Preconditions.checkNotNull(containerData); + Path dbPath = Paths.get(containerData.getDBPath()); + + MetadataStore db = KeyUtils.getDB(containerData, conf); + // If the container is not empty and cannot be deleted forcibly, + // then throw a SCE to stop deleting. + if(!forceDelete && !db.isEmpty()) { + throw new StorageContainerException( + "Container cannot be deleted because it is not empty.", + ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY); + } + // Close the DB connection and remove the DB handler from cache + KeyUtils.removeDB(containerData, conf); + + // Delete the DB File. + FileUtils.forceDelete(dbPath.toFile()); + dbPath = dbPath.getParent(); + + // Delete all Metadata in the Data directories for this containers. + if (dbPath != null) { + FileUtils.deleteDirectory(dbPath.toFile()); + dbPath = dbPath.getParent(); + } + + // now delete the container directory, this means that all key data dirs + // will be removed too. + if (dbPath != null) { + FileUtils.deleteDirectory(dbPath.toFile()); + } + + // Delete the container metadata from the metadata locations. + String rootPath = getContainerNameFromFile(new File(containerData + .getContainerPath())); + Path containerPath = Paths.get(rootPath.concat(CONTAINER_EXTENSION)); + Path metaPath = Paths.get(rootPath.concat(CONTAINER_META)); + + FileUtils.forceDelete(containerPath.toFile()); + FileUtils.forceDelete(metaPath.toFile()); + } + + /** + * Persistent a {@link DatanodeDetails} to a local file. + * + * @throws IOException when read/write error occurs + */ + public synchronized static void writeDatanodeDetailsTo( + DatanodeDetails datanodeDetails, File path) throws IOException { + if (path.exists()) { + if (!path.delete() || !path.createNewFile()) { + throw new IOException("Unable to overwrite the datanode ID file."); + } + } else { + if(!path.getParentFile().exists() && + !path.getParentFile().mkdirs()) { + throw new IOException("Unable to create datanode ID directories."); + } + } + try (FileOutputStream out = new FileOutputStream(path)) { + HddsProtos.DatanodeDetailsProto proto = + datanodeDetails.getProtoBufMessage(); + proto.writeTo(out); + } + } + + /** + * Read {@link DatanodeDetails} from a local ID file. + * + * @param path ID file local path + * @return {@link DatanodeDetails} + * @throws IOException If the id file is malformed or other I/O exceptions + */ + public synchronized static DatanodeDetails readDatanodeDetailsFrom(File path) + throws IOException { + if (!path.exists()) { + throw new IOException("Datanode ID file not found."); + } + try(FileInputStream in = new FileInputStream(path)) { + return DatanodeDetails.getFromProtoBuf( + HddsProtos.DatanodeDetailsProto.parseFrom(in)); + } catch (IOException e) { + throw new IOException("Failed to parse DatanodeDetails from " + + path.getAbsolutePath(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java new file mode 100644 index 0000000..ade162a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A helper class to wrap the info about under deletion container blocks. + */ +public final class DeletedContainerBlocksSummary { + + private final List<DeletedBlocksTransaction> blocks; + // key : txID + // value : times of this tx has been processed + private final Map<Long, Integer> txSummary; + // key : container name + // value : the number of blocks need to be deleted in this container + // if the message contains multiple entries for same block, + // blocks will be merged + private final Map<String, Integer> blockSummary; + // total number of blocks in this message + private int numOfBlocks; + + private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) { + this.blocks = blocks; + txSummary = Maps.newHashMap(); + blockSummary = Maps.newHashMap(); + blocks.forEach(entry -> { + txSummary.put(entry.getTxID(), entry.getCount()); + if (blockSummary.containsKey(entry.getContainerName())) { + blockSummary.put(entry.getContainerName(), + blockSummary.get(entry.getContainerName()) + + entry.getBlockIDCount()); + } else { + blockSummary.put(entry.getContainerName(), entry.getBlockIDCount()); + } + numOfBlocks += entry.getBlockIDCount(); + }); + } + + public static DeletedContainerBlocksSummary getFrom( + List<DeletedBlocksTransaction> blocks) { + return new DeletedContainerBlocksSummary(blocks); + } + + public int getNumOfBlocks() { + return numOfBlocks; + } + + public int getNumOfContainers() { + return blockSummary.size(); + } + + public String getTXIDs() { + return String.join(",", txSummary.keySet() + .stream().map(String::valueOf).collect(Collectors.toList())); + } + + public String getTxIDSummary() { + List<String> txSummaryEntry = txSummary.entrySet().stream() + .map(entry -> entry.getKey() + "(" + entry.getValue() + ")") + .collect(Collectors.toList()); + return "[" + String.join(",", txSummaryEntry) + "]"; + } + + @Override public String toString() { + StringBuffer sb = new StringBuffer(); + for (DeletedBlocksTransaction blks : blocks) { + sb.append(" ") + .append("TXID=") + .append(blks.getTxID()) + .append(", ") + .append("TimesProceed=") + .append(blks.getCount()) + .append(", ") + .append(blks.getContainerName()) + .append(" : [") + .append(String.join(",", blks.getBlockIDList())).append("]") + .append("\n"); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java new file mode 100644 index 0000000..566db02 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import org.apache.hadoop.hdds.protocol.proto.ContainerProtos; + +/** + * File Utils are helper routines used by putSmallFile and getSmallFile + * RPCs. + */ +public final class FileUtils { + /** + * Never Constructed. + */ + private FileUtils() { + } + + /** + * Gets a response for the putSmallFile RPC. + * @param msg - ContainerCommandRequestProto + * @return - ContainerCommandResponseProto + */ + public static ContainerProtos.ContainerCommandResponseProto + getPutFileResponse(ContainerProtos.ContainerCommandRequestProto msg) { + ContainerProtos.PutSmallFileResponseProto.Builder getResponse = + ContainerProtos.PutSmallFileResponseProto.newBuilder(); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setCmdType(ContainerProtos.Type.PutSmallFile); + builder.setPutSmallFile(getResponse); + return builder.build(); + } + + /** + * Gets a response to the read small file call. + * @param msg - Msg + * @param data - Data + * @param info - Info + * @return Response. + */ + public static ContainerProtos.ContainerCommandResponseProto + getGetSmallFileResponse(ContainerProtos.ContainerCommandRequestProto msg, + byte[] data, ChunkInfo info) { + Preconditions.checkNotNull(msg); + + ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse = + ContainerProtos.ReadChunkResponseProto.newBuilder(); + readChunkresponse.setChunkData(info.getProtoBufMessage()); + readChunkresponse.setData(ByteString.copyFrom(data)); + readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline()); + + ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile = + ContainerProtos.GetSmallFileResponseProto.newBuilder(); + getSmallFile.setData(readChunkresponse.build()); + ContainerProtos.ContainerCommandResponseProto.Builder builder = + ContainerUtils.getContainerResponse(msg, ContainerProtos.Result + .SUCCESS, ""); + builder.setCmdType(ContainerProtos.Type.GetSmallFile); + builder.setGetSmallFile(getSmallFile); + return builder.build(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
