HDFS-12665. [AliasMap] Create a version of the AliasMap that runs in memory in the Namenode (leveldb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/36957f0d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/36957f0d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/36957f0d Branch: refs/heads/HDFS-9806 Commit: 36957f0d20a1caeb389dbc11806891108942c9ea Parents: 8da735e Author: Virajith Jalaparti <viraj...@apache.org> Authored: Thu Nov 30 10:37:28 2017 -0800 Committer: Virajith Jalaparti <viraj...@apache.org> Committed: Fri Dec 1 18:18:48 2017 -0800 ---------------------------------------------------------------------- .../hdfs/protocol/ProvidedStorageLocation.java | 85 +++++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 32 ++ .../src/main/proto/hdfs.proto | 14 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 7 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 9 + .../hdfs/protocolPB/AliasMapProtocolPB.java | 35 ++ .../AliasMapProtocolServerSideTranslatorPB.java | 120 +++++++ ...yAliasMapProtocolClientSideTranslatorPB.java | 159 +++++++++ .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 28 ++ .../hdfs/server/aliasmap/InMemoryAliasMap.java | 213 ++++++++++++ .../aliasmap/InMemoryAliasMapProtocol.java | 92 +++++ .../aliasmap/InMemoryLevelDBAliasMapServer.java | 141 ++++++++ .../hadoop/hdfs/server/common/FileRegion.java | 89 ++--- .../common/blockaliasmap/BlockAliasMap.java | 19 +- .../impl/InMemoryLevelDBAliasMapClient.java | 156 +++++++++ .../impl/TextFileRegionAliasMap.java | 40 ++- .../datanode/FinalizedProvidedReplica.java | 11 + .../hdfs/server/datanode/ReplicaBuilder.java | 7 +- .../fsdataset/impl/ProvidedVolumeImpl.java | 38 +-- .../hadoop/hdfs/server/namenode/NameNode.java | 21 ++ .../src/main/proto/AliasMapProtocol.proto | 60 ++++ .../src/main/resources/hdfs-default.xml | 34 ++ .../server/aliasmap/ITestInMemoryAliasMap.java | 126 +++++++ .../server/aliasmap/TestInMemoryAliasMap.java | 45 +++ .../blockmanagement/TestProvidedStorageMap.java | 1 - .../impl/TestInMemoryLevelDBAliasMapClient.java | 341 +++++++++++++++++++ .../impl/TestLevelDbMockAliasMapClient.java | 116 +++++++ .../fsdataset/impl/TestProvidedImpl.java | 9 +- hadoop-project/pom.xml | 8 +- hadoop-tools/hadoop-fs2img/pom.xml | 6 + .../hdfs/server/namenode/NullBlockAliasMap.java | 9 +- .../TestNameNodeProvidedImplementation.java | 65 +++- 32 files changed, 2016 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ProvidedStorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ProvidedStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ProvidedStorageLocation.java new file mode 100644 index 0000000..eee58ba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ProvidedStorageLocation.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.fs.Path; + +import javax.annotation.Nonnull; +import java.util.Arrays; + +/** + * ProvidedStorageLocation is a location in an external storage system + * containing the data for a block (~Replica). + */ +public class ProvidedStorageLocation { + private final Path path; + private final long offset; + private final long length; + private final byte[] nonce; + + public ProvidedStorageLocation(Path path, long offset, long length, + byte[] nonce) { + this.path = path; + this.offset = offset; + this.length = length; + this.nonce = Arrays.copyOf(nonce, nonce.length); + } + + public @Nonnull Path getPath() { + return path; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + + public @Nonnull byte[] getNonce() { + // create a copy of the nonce and return it. + return Arrays.copyOf(nonce, nonce.length); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ProvidedStorageLocation that = (ProvidedStorageLocation) o; + + if ((offset != that.offset) || (length != that.length) + || !path.equals(that.path)) { + return false; + } + return Arrays.equals(nonce, that.nonce); + } + + @Override + public int hashCode() { + int result = path.hashCode(); + result = 31 * result + (int) (offset ^ (offset >>> 32)); + result = 31 * result + (int) (length ^ (length >>> 32)); + result = 31 * result + Arrays.hashCode(nonce); + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 460112e..74fe34c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -97,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; @@ -3242,4 +3243,35 @@ public class PBHelperClient { } return ret; } + + public static ProvidedStorageLocation convert( + HdfsProtos.ProvidedStorageLocationProto providedStorageLocationProto) { + if (providedStorageLocationProto == null) { + return null; + } + String path = providedStorageLocationProto.getPath(); + long length = providedStorageLocationProto.getLength(); + long offset = providedStorageLocationProto.getOffset(); + ByteString nonce = providedStorageLocationProto.getNonce(); + + if (path == null || length == -1 || offset == -1 || nonce == null) { + return null; + } else { + return new ProvidedStorageLocation(new Path(path), offset, length, + nonce.toByteArray()); + } + } + + public static HdfsProtos.ProvidedStorageLocationProto convert( + ProvidedStorageLocation providedStorageLocation) { + String path = providedStorageLocation.getPath().toString(); + return HdfsProtos.ProvidedStorageLocationProto.newBuilder() + .setPath(path) + .setLength(providedStorageLocation.getLength()) + .setOffset(providedStorageLocation.getOffset()) + .setNonce(ByteString.copyFrom(providedStorageLocation.getNonce())) + .build(); + } + + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 06578ca..e841975 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -45,6 +45,20 @@ message ExtendedBlockProto { // here for historical reasons } + +/** +* ProvidedStorageLocation will contain the exact location in the provided + storage. The path, offset and length will result in ranged read. The nonce + is there to verify that you receive what you expect. +*/ + +message ProvidedStorageLocationProto { + required string path = 1; + required int64 offset = 2; + required int64 length = 3; + required bytes nonce = 4; +} + /** * Identifies a Datanode */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 65eea31..b647923 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -191,7 +191,6 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> - <version>1.8</version> </dependency> <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> @@ -208,6 +207,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>curator-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -341,6 +345,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <include>fsimage.proto</include> <include>FederationProtocol.proto</include> <include>RouterProtocol.proto</include> + <include>AliasMapProtocol.proto</include> </includes> </source> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fbdc859..00976f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -95,6 +95,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105"; public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir"; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size"; + public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500; + public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED = "dfs.provided.aliasmap.inmemory.enabled"; + public static final boolean DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT = false; + public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = @@ -1633,4 +1641,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys { @Deprecated public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolPB.java new file mode 100644 index 0000000..98b3ee1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolPB.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos; +import org.apache.hadoop.ipc.ProtocolInfo; + +/** + * Protocol between the Namenode and the Datanode to read the AliasMap + * used for Provided storage. + * TODO add Kerberos support + */ +@ProtocolInfo( + protocolName = + "org.apache.hadoop.hdfs.server.aliasmap.AliasMapProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface AliasMapProtocolPB extends + AliasMapProtocolProtos.AliasMapProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..808c43b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/AliasMapProtocolServerSideTranslatorPB.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.ReadResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.WriteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.WriteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; +import org.apache.hadoop.hdfs.server.common.FileRegion; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; +import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.*; + +/** + * AliasMapProtocolServerSideTranslatorPB is responsible for translating RPC + * calls and forwarding them to the internal InMemoryAliasMap. + */ +public class AliasMapProtocolServerSideTranslatorPB + implements AliasMapProtocolPB { + + private final InMemoryAliasMapProtocol aliasMap; + + public AliasMapProtocolServerSideTranslatorPB( + InMemoryAliasMapProtocol aliasMap) { + this.aliasMap = aliasMap; + } + + private static final WriteResponseProto VOID_WRITE_RESPONSE = + WriteResponseProto.newBuilder().build(); + + @Override + public WriteResponseProto write(RpcController controller, + WriteRequestProto request) throws ServiceException { + try { + FileRegion toWrite = + PBHelper.convert(request.getKeyValuePair()); + + aliasMap.write(toWrite.getBlock(), toWrite.getProvidedStorageLocation()); + return VOID_WRITE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReadResponseProto read(RpcController controller, + ReadRequestProto request) throws ServiceException { + try { + Block toRead = PBHelperClient.convert(request.getKey()); + + Optional<ProvidedStorageLocation> optionalResult = + aliasMap.read(toRead); + + ReadResponseProto.Builder builder = ReadResponseProto.newBuilder(); + if (optionalResult.isPresent()) { + ProvidedStorageLocation providedStorageLocation = optionalResult.get(); + builder.setValue(PBHelperClient.convert(providedStorageLocation)); + } + + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListResponseProto list(RpcController controller, + ListRequestProto request) throws ServiceException { + try { + BlockProto marker = request.getMarker(); + IterationResult iterationResult; + if (marker.isInitialized()) { + iterationResult = + aliasMap.list(Optional.of(PBHelperClient.convert(marker))); + } else { + iterationResult = aliasMap.list(Optional.empty()); + } + ListResponseProto.Builder responseBuilder = + ListResponseProto.newBuilder(); + List<FileRegion> fileRegions = iterationResult.getFileRegions(); + + List<KeyValueProto> keyValueProtos = fileRegions.stream() + .map(PBHelper::convert).collect(Collectors.toList()); + responseBuilder.addAllFileRegions(keyValueProtos); + Optional<Block> nextMarker = iterationResult.getNextBlock(); + nextMarker + .map(m -> responseBuilder.setNextMarker(PBHelperClient.convert(m))); + + return responseBuilder.build(); + + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..a79360f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; +import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*; + +/** + * This class is the client side translator to translate requests made to the + * {@link InMemoryAliasMapProtocol} interface to the RPC server implementing + * {@link AliasMapProtocolPB}. + */ +public class InMemoryAliasMapProtocolClientSideTranslatorPB + implements InMemoryAliasMapProtocol { + + private static final Logger LOG = + LoggerFactory + .getLogger(InMemoryAliasMapProtocolClientSideTranslatorPB.class); + + private AliasMapProtocolPB rpcProxy; + + public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) { + String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT); + InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr); + + RPC.setProtocolEngine(conf, AliasMapProtocolPB.class, + ProtobufRpcEngine.class); + LOG.info("Connecting to address: " + addr); + try { + rpcProxy = RPC.getProxy(AliasMapProtocolPB.class, + RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null, + conf, NetUtils.getDefaultSocketFactory(conf), 0); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public InMemoryAliasMap.IterationResult list(Optional<Block> marker) + throws IOException { + ListRequestProto.Builder builder = ListRequestProto.newBuilder(); + if (marker.isPresent()) { + builder.setMarker(PBHelperClient.convert(marker.get())); + } + ListRequestProto request = builder.build(); + try { + ListResponseProto response = rpcProxy.list(null, request); + List<KeyValueProto> fileRegionsList = response.getFileRegionsList(); + + List<FileRegion> fileRegions = fileRegionsList + .stream() + .map(kv -> new FileRegion( + PBHelperClient.convert(kv.getKey()), + PBHelperClient.convert(kv.getValue()), + null + )) + .collect(Collectors.toList()); + BlockProto nextMarker = response.getNextMarker(); + + if (nextMarker.isInitialized()) { + return new InMemoryAliasMap.IterationResult(fileRegions, + Optional.of(PBHelperClient.convert(nextMarker))); + } else { + return new InMemoryAliasMap.IterationResult(fileRegions, + Optional.empty()); + } + + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Nonnull + @Override + public Optional<ProvidedStorageLocation> read(@Nonnull Block block) + throws IOException { + + ReadRequestProto request = + ReadRequestProto + .newBuilder() + .setKey(PBHelperClient.convert(block)) + .build(); + try { + ReadResponseProto response = rpcProxy.read(null, request); + + ProvidedStorageLocationProto providedStorageLocation = + response.getValue(); + if (providedStorageLocation.isInitialized()) { + return Optional.of(PBHelperClient.convert(providedStorageLocation)); + } + return Optional.empty(); + + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void write(@Nonnull Block block, + @Nonnull ProvidedStorageLocation providedStorageLocation) + throws IOException { + WriteRequestProto request = + WriteRequestProto + .newBuilder() + .setKeyValuePair(KeyValueProto.newBuilder() + .setKey(PBHelperClient.convert(block)) + .setValue(PBHelperClient.convert(providedStorageLocation)) + .build()) + .build(); + + try { + rpcProxy.write(null, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + public void stop() { + RPC.stopProxy(rpcProxy); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 6539d32..2952a5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.KeyValueProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; @@ -56,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstr import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -80,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -1096,4 +1100,28 @@ public class PBHelper { DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, blkECReconstructionInfos); } + + public static KeyValueProto convert(FileRegion fileRegion) { + return KeyValueProto + .newBuilder() + .setKey(PBHelperClient.convert(fileRegion.getBlock())) + .setValue(PBHelperClient.convert( + fileRegion.getProvidedStorageLocation())) + .build(); + } + + public static FileRegion + convert(KeyValueProto keyValueProto) { + BlockProto blockProto = + keyValueProto.getKey(); + ProvidedStorageLocationProto providedStorageLocationProto = + keyValueProto.getValue(); + + Block block = + PBHelperClient.convert(blockProto); + ProvidedStorageLocation providedStorageLocation = + PBHelperClient.convert(providedStorageLocationProto); + + return new FileRegion(block, providedStorageLocation, null); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java new file mode 100644 index 0000000..be891e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMap.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.aliasmap; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; + +/** + * InMemoryAliasMap is an implementation of the InMemoryAliasMapProtocol for + * use with LevelDB. + */ +public class InMemoryAliasMap implements InMemoryAliasMapProtocol, + Configurable { + + private static final Logger LOG = LoggerFactory + .getLogger(InMemoryAliasMap.class); + + private final DB levelDb; + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @VisibleForTesting + static String createPathErrorMessage(String directory) { + return new StringBuilder() + .append("Configured directory '") + .append(directory) + .append("' doesn't exist") + .toString(); + } + + public static @Nonnull InMemoryAliasMap init(Configuration conf) + throws IOException { + Options options = new Options(); + options.createIfMissing(true); + String directory = + conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR); + LOG.info("Attempting to load InMemoryAliasMap from \"{}\"", directory); + File path = new File(directory); + if (!path.exists()) { + String error = createPathErrorMessage(directory); + throw new IOException(error); + } + DB levelDb = JniDBFactory.factory.open(path, options); + InMemoryAliasMap aliasMap = new InMemoryAliasMap(levelDb); + aliasMap.setConf(conf); + return aliasMap; + } + + @VisibleForTesting + InMemoryAliasMap(DB levelDb) { + this.levelDb = levelDb; + } + + @Override + public IterationResult list(Optional<Block> marker) throws IOException { + return withIterator((DBIterator iterator) -> { + Integer batchSize = + conf.getInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE, + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT); + if (marker.isPresent()) { + iterator.seek(toProtoBufBytes(marker.get())); + } else { + iterator.seekToFirst(); + } + int i = 0; + ArrayList<FileRegion> batch = + Lists.newArrayListWithExpectedSize(batchSize); + while (iterator.hasNext() && i < batchSize) { + Map.Entry<byte[], byte[]> entry = iterator.next(); + Block block = fromBlockBytes(entry.getKey()); + ProvidedStorageLocation providedStorageLocation = + fromProvidedStorageLocationBytes(entry.getValue()); + batch.add(new FileRegion(block, providedStorageLocation, null)); + ++i; + } + if (iterator.hasNext()) { + Block nextMarker = fromBlockBytes(iterator.next().getKey()); + return new IterationResult(batch, Optional.of(nextMarker)); + } else { + return new IterationResult(batch, Optional.empty()); + } + + }); + } + + public @Nonnull Optional<ProvidedStorageLocation> read(@Nonnull Block block) + throws IOException { + + byte[] extendedBlockDbFormat = toProtoBufBytes(block); + byte[] providedStorageLocationDbFormat = levelDb.get(extendedBlockDbFormat); + if (providedStorageLocationDbFormat == null) { + return Optional.empty(); + } else { + ProvidedStorageLocation providedStorageLocation = + fromProvidedStorageLocationBytes(providedStorageLocationDbFormat); + return Optional.of(providedStorageLocation); + } + } + + public void write(@Nonnull Block block, + @Nonnull ProvidedStorageLocation providedStorageLocation) + throws IOException { + byte[] extendedBlockDbFormat = toProtoBufBytes(block); + byte[] providedStorageLocationDbFormat = + toProtoBufBytes(providedStorageLocation); + levelDb.put(extendedBlockDbFormat, providedStorageLocationDbFormat); + } + + public void close() throws IOException { + levelDb.close(); + } + + @Nonnull + public static ProvidedStorageLocation fromProvidedStorageLocationBytes( + @Nonnull byte[] providedStorageLocationDbFormat) + throws InvalidProtocolBufferException { + ProvidedStorageLocationProto providedStorageLocationProto = + ProvidedStorageLocationProto + .parseFrom(providedStorageLocationDbFormat); + return PBHelperClient.convert(providedStorageLocationProto); + } + + @Nonnull + public static Block fromBlockBytes(@Nonnull byte[] blockDbFormat) + throws InvalidProtocolBufferException { + BlockProto blockProto = BlockProto.parseFrom(blockDbFormat); + return PBHelperClient.convert(blockProto); + } + + public static byte[] toProtoBufBytes(@Nonnull ProvidedStorageLocation + providedStorageLocation) throws IOException { + ProvidedStorageLocationProto providedStorageLocationProto = + PBHelperClient.convert(providedStorageLocation); + ByteArrayOutputStream providedStorageLocationOutputStream = + new ByteArrayOutputStream(); + providedStorageLocationProto.writeTo(providedStorageLocationOutputStream); + return providedStorageLocationOutputStream.toByteArray(); + } + + public static byte[] toProtoBufBytes(@Nonnull Block block) + throws IOException { + BlockProto blockProto = + PBHelperClient.convert(block); + ByteArrayOutputStream blockOutputStream = new ByteArrayOutputStream(); + blockProto.writeTo(blockOutputStream); + return blockOutputStream.toByteArray(); + } + + private IterationResult withIterator( + CheckedFunction<DBIterator, IterationResult> func) throws IOException { + try (DBIterator iterator = levelDb.iterator()) { + return func.apply(iterator); + } + } + + /** + * CheckedFunction is akin to {@link java.util.function.Function} but + * specifies an IOException. + * @param <T> Argument type. + * @param <R> Return type. + */ + @FunctionalInterface + public interface CheckedFunction<T, R> { + R apply(T t) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java new file mode 100644 index 0000000..fb6e8b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.aliasmap; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.server.common.FileRegion; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * Protocol used by clients to read/write data about aliases of + * provided blocks for an in-memory implementation of the + * {@link org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap}. + */ +public interface InMemoryAliasMapProtocol { + + /** + * The result of a read from the in-memory aliasmap. It contains the + * a list of FileRegions that are returned, along with the next block + * from which the read operation must continue. + */ + class IterationResult { + + private final List<FileRegion> batch; + private final Optional<Block> nextMarker; + + public IterationResult(List<FileRegion> batch, Optional<Block> nextMarker) { + this.batch = batch; + this.nextMarker = nextMarker; + } + + public List<FileRegion> getFileRegions() { + return batch; + } + + public Optional<Block> getNextBlock() { + return nextMarker; + } + } + + /** + * List the next batch of {@link FileRegion}s in the alias map starting from + * the given {@code marker}. To retrieve all {@link FileRegion}s stored in the + * alias map, multiple calls to this function might be required. + * @param marker the next block to get fileregions from. + * @return the {@link IterationResult} with a set of + * FileRegions and the next marker. + * @throws IOException + */ + InMemoryAliasMap.IterationResult list(Optional<Block> marker) + throws IOException; + + /** + * Gets the {@link ProvidedStorageLocation} associated with the + * specified block. + * @param block the block to lookup + * @return the associated {@link ProvidedStorageLocation}. + * @throws IOException + */ + @Nonnull + Optional<ProvidedStorageLocation> read(@Nonnull Block block) + throws IOException; + + /** + * Stores the block and it's associated {@link ProvidedStorageLocation} + * in the alias map. + * @param block + * @param providedStorageLocation + * @throws IOException + */ + void write(@Nonnull Block block, + @Nonnull ProvidedStorageLocation providedStorageLocation) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java new file mode 100644 index 0000000..91b1e83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.aliasmap; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.RPC; +import javax.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.util.Optional; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*; +import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction; + +/** + * InMemoryLevelDBAliasMapServer is the entry point from the Namenode into + * the {@link InMemoryAliasMap}. + */ +public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol, + Configurable, Closeable { + + private static final Logger LOG = LoggerFactory + .getLogger(InMemoryLevelDBAliasMapServer.class); + private final CheckedFunction<Configuration, InMemoryAliasMap> initFun; + private RPC.Server aliasMapServer; + private Configuration conf; + private InMemoryAliasMap aliasMap; + + public InMemoryLevelDBAliasMapServer( + CheckedFunction<Configuration, InMemoryAliasMap> initFun) { + this.initFun = initFun; + + } + + public void start() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + throw new UnsupportedOperationException("Unable to start " + + "InMemoryLevelDBAliasMapServer as security is enabled"); + } + RPC.setProtocolEngine(getConf(), AliasMapProtocolPB.class, + ProtobufRpcEngine.class); + AliasMapProtocolServerSideTranslatorPB aliasMapProtocolXlator = + new AliasMapProtocolServerSideTranslatorPB(this); + + BlockingService aliasMapProtocolService = + AliasMapProtocolService + .newReflectiveBlockingService(aliasMapProtocolXlator); + + String rpcAddress = + conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, + DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT); + String[] split = rpcAddress.split(":"); + String bindHost = split[0]; + Integer port = Integer.valueOf(split[1]); + + aliasMapServer = new RPC.Builder(conf) + .setProtocol(AliasMapProtocolPB.class) + .setInstance(aliasMapProtocolService) + .setBindAddress(bindHost) + .setPort(port) + .setNumHandlers(1) + .setVerbose(true) + .build(); + + LOG.info("Starting InMemoryLevelDBAliasMapServer on ", rpcAddress); + aliasMapServer.start(); + } + + @Override + public InMemoryAliasMap.IterationResult list(Optional<Block> marker) + throws IOException { + return aliasMap.list(marker); + } + + @Nonnull + @Override + public Optional<ProvidedStorageLocation> read(@Nonnull Block block) + throws IOException { + return aliasMap.read(block); + } + + @Override + public void write(@Nonnull Block block, + @Nonnull ProvidedStorageLocation providedStorageLocation) + throws IOException { + aliasMap.write(block, providedStorageLocation); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + try { + this.aliasMap = initFun.apply(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void close() { + LOG.info("Stopping InMemoryLevelDBAliasMapServer"); + try { + aliasMap.close(); + } catch (IOException e) { + LOG.error(e.getMessage()); + } + aliasMapServer.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java index c568b90..5d04640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hdfs.server.common; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; /** * This class is used to represent provided blocks that are file regions, @@ -27,95 +29,70 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; */ public class FileRegion implements BlockAlias { - private final Path path; - private final long offset; - private final long length; - private final long blockId; + private final Pair<Block, ProvidedStorageLocation> pair; private final String bpid; - private final long genStamp; public FileRegion(long blockId, Path path, long offset, long length, String bpid, long genStamp) { - this.path = path; - this.offset = offset; - this.length = length; - this.blockId = blockId; - this.bpid = bpid; - this.genStamp = genStamp; + this(new Block(blockId, length, genStamp), + new ProvidedStorageLocation(path, offset, length, new byte[0]), bpid); } public FileRegion(long blockId, Path path, long offset, long length, String bpid) { this(blockId, path, offset, length, bpid, HdfsConstants.GRANDFATHER_GENERATION_STAMP); - } public FileRegion(long blockId, Path path, long offset, long length, long genStamp) { this(blockId, path, offset, length, null, genStamp); + } + public FileRegion(Block block, + ProvidedStorageLocation providedStorageLocation) { + this.pair = Pair.of(block, providedStorageLocation); + this.bpid = null; + } + + public FileRegion(Block block, + ProvidedStorageLocation providedStorageLocation, String bpid) { + this.pair = Pair.of(block, providedStorageLocation); + this.bpid = bpid; } public FileRegion(long blockId, Path path, long offset, long length) { this(blockId, path, offset, length, null); } - @Override public Block getBlock() { - return new Block(blockId, length, genStamp); + return pair.getKey(); } - @Override - public boolean equals(Object other) { - if (!(other instanceof FileRegion)) { - return false; - } - FileRegion o = (FileRegion) other; - return blockId == o.blockId - && offset == o.offset - && length == o.length - && genStamp == o.genStamp - && path.equals(o.path); - } - - @Override - public int hashCode() { - return (int)(blockId & Integer.MIN_VALUE); + public ProvidedStorageLocation getProvidedStorageLocation() { + return pair.getValue(); } - public Path getPath() { - return path; + public String getBlockPoolId() { + return this.bpid; } - public long getOffset() { - return offset; - } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - public long getLength() { - return length; - } + FileRegion that = (FileRegion) o; - public long getGenerationStamp() { - return genStamp; + return pair.equals(that.pair); } @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{ block=\"").append(getBlock()).append("\""); - sb.append(", path=\"").append(getPath()).append("\""); - sb.append(", off=\"").append(getOffset()).append("\""); - sb.append(", len=\"").append(getBlock().getNumBytes()).append("\""); - sb.append(", genStamp=\"").append(getBlock() - .getGenerationStamp()).append("\""); - sb.append(", bpid=\"").append(bpid).append("\""); - sb.append(" }"); - return sb.toString(); - } - - public String getBlockPoolId() { - return this.bpid; + public int hashCode() { + return pair.hashCode(); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java index d276fb5..e3b6cb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.common.blockaliasmap; import java.io.Closeable; import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.BlockAlias; @@ -29,6 +31,19 @@ import org.apache.hadoop.hdfs.server.common.BlockAlias; public abstract class BlockAliasMap<T extends BlockAlias> { /** + * ImmutableIterator is an Iterator that does not support the remove + * operation. This could inherit {@link java.util.Enumeration} but Iterator + * is supported by more APIs and Enumeration's javadoc even suggests using + * Iterator instead. + */ + public abstract class ImmutableIterator implements Iterator<T> { + public void remove() { + throw new UnsupportedOperationException( + "Remove is not supported for provided storage"); + } + } + + /** * An abstract class that is used to read {@link BlockAlias}es * for provided blocks. */ @@ -45,7 +60,7 @@ public abstract class BlockAliasMap<T extends BlockAlias> { * @return BlockAlias correspoding to the provided block. * @throws IOException */ - public abstract U resolve(Block ident) throws IOException; + public abstract Optional<U> resolve(Block ident) throws IOException; } @@ -85,4 +100,6 @@ public abstract class BlockAliasMap<T extends BlockAlias> { */ public abstract void refresh() throws IOException; + public abstract void close() throws IOException; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java new file mode 100644 index 0000000..7b0b789 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * InMemoryLevelDBAliasMapClient is the client for the InMemoryAliasMapServer. + * This is used by the Datanode and fs2img to store and retrieve FileRegions + * based on the given Block. + */ +public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion> + implements Configurable { + + private Configuration conf; + private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap; + + @Override + public void close() { + aliasMap.stop(); + } + + class LevelDbReader extends BlockAliasMap.Reader<FileRegion> { + + @Override + public Optional<FileRegion> resolve(Block block) throws IOException { + Optional<ProvidedStorageLocation> read = aliasMap.read(block); + return read.map(psl -> new FileRegion(block, psl, null)); + } + + @Override + public void close() throws IOException { + } + + private class LevelDbIterator + extends BlockAliasMap<FileRegion>.ImmutableIterator { + + private Iterator<FileRegion> iterator; + private Optional<Block> nextMarker; + + LevelDbIterator() { + batch(Optional.empty()); + } + + private void batch(Optional<Block> newNextMarker) { + try { + InMemoryAliasMap.IterationResult iterationResult = + aliasMap.list(newNextMarker); + List<FileRegion> fileRegions = iterationResult.getFileRegions(); + this.iterator = fileRegions.iterator(); + this.nextMarker = iterationResult.getNextBlock(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return iterator.hasNext() || nextMarker.isPresent(); + } + + @Override + public FileRegion next() { + if (iterator.hasNext()) { + return iterator.next(); + } else { + if (nextMarker.isPresent()) { + batch(nextMarker); + return next(); + } else { + throw new NoSuchElementException(); + } + } + } + } + + @Override + public Iterator<FileRegion> iterator() { + return new LevelDbIterator(); + } + } + + class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> { + @Override + public void store(FileRegion fileRegion) throws IOException { + aliasMap.write(fileRegion.getBlock(), + fileRegion.getProvidedStorageLocation()); + } + + @Override + public void close() throws IOException { + } + } + + InMemoryLevelDBAliasMapClient() { + if (UserGroupInformation.isSecurityEnabled()) { + throw new UnsupportedOperationException("Unable to start " + + "InMemoryLevelDBAliasMapClient as security is enabled"); + } + } + + + @Override + public Reader<FileRegion> getReader(Reader.Options opts) throws IOException { + return new LevelDbReader(); + } + + @Override + public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { + return new LevelDbWriter(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void refresh() throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java index bd04d60..b86b280 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Collections; import java.util.IdentityHashMap; import java.util.NoSuchElementException; +import java.util.Optional; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.io.MultipleIOException; @@ -160,7 +162,7 @@ public class TextFileRegionAliasMap file = new Path(tmpfile); delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT); - LOG.info("TextFileRegionAliasMap: read path " + tmpfile.toString()); + LOG.info("TextFileRegionAliasMap: read path {}", tmpfile); } @Override @@ -190,7 +192,7 @@ public class TextFileRegionAliasMap private Configuration conf; private String codec = null; private Path file = - new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);; + new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); private String delim = DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT; @@ -252,7 +254,7 @@ public class TextFileRegionAliasMap Options delimiter(String delim); } - static ReaderOptions defaults() { + public static ReaderOptions defaults() { return new ReaderOptions(); } @@ -278,14 +280,14 @@ public class TextFileRegionAliasMap } @Override - public FileRegion resolve(Block ident) throws IOException { + public Optional<FileRegion> resolve(Block ident) throws IOException { // consider layering index w/ composable format Iterator<FileRegion> i = iterator(); try { while (i.hasNext()) { FileRegion f = i.next(); if (f.getBlock().equals(ident)) { - return f; + return Optional.of(f); } } } finally { @@ -295,7 +297,7 @@ public class TextFileRegionAliasMap r.close(); } } - return null; + return Optional.empty(); } class FRIterator implements Iterator<FileRegion> { @@ -342,8 +344,8 @@ public class TextFileRegionAliasMap throw new IOException("Invalid line: " + line); } return new FileRegion(Long.parseLong(f[0]), new Path(f[1]), - Long.parseLong(f[2]), Long.parseLong(f[3]), f[5], - Long.parseLong(f[4])); + Long.parseLong(f[2]), Long.parseLong(f[3]), f[4], + Long.parseLong(f[5])); } public InputStream createStream() throws IOException { @@ -390,7 +392,6 @@ public class TextFileRegionAliasMap throw MultipleIOException.createIOException(ex); } } - } /** @@ -422,12 +423,16 @@ public class TextFileRegionAliasMap @Override public void store(FileRegion token) throws IOException { - out.append(String.valueOf(token.getBlock().getBlockId())).append(delim); - out.append(token.getPath().toString()).append(delim); - out.append(Long.toString(token.getOffset())).append(delim); - out.append(Long.toString(token.getLength())).append(delim); - out.append(Long.toString(token.getGenerationStamp())).append(delim); - out.append(token.getBlockPoolId()).append("\n"); + final Block block = token.getBlock(); + final ProvidedStorageLocation psl = token.getProvidedStorageLocation(); + + out.append(String.valueOf(block.getBlockId())).append(delim); + out.append(psl.getPath().toString()).append(delim); + out.append(Long.toString(psl.getOffset())).append(delim); + out.append(Long.toString(psl.getLength())).append(delim); + out.append(token.getBlockPoolId()).append(delim); + out.append(Long.toString(block.getGenerationStamp())).append(delim); + out.append("\n"); } @Override @@ -443,4 +448,9 @@ public class TextFileRegionAliasMap "Refresh not supported by " + getClass()); } + @Override + public void close() throws IOException { + //nothing to do; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java index bcc9a38..0fbfc15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java @@ -22,6 +22,7 @@ import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -38,6 +39,16 @@ public class FinalizedProvidedReplica extends ProvidedReplica { remoteFS); } + public FinalizedProvidedReplica(FileRegion fileRegion, FsVolumeSpi volume, + Configuration conf, FileSystem remoteFS) { + super(fileRegion.getBlock().getBlockId(), + fileRegion.getProvidedStorageLocation().getPath().toUri(), + fileRegion.getProvidedStorageLocation().getOffset(), + fileRegion.getBlock().getNumBytes(), + fileRegion.getBlock().getGenerationStamp(), + volume, conf, remoteFS); + } + public FinalizedProvidedReplica(long blockId, Path pathPrefix, String pathSuffix, long fileOffset, long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf, FileSystem remoteFS) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index de68e2d..8748918 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -315,12 +315,7 @@ public class ReplicaBuilder { offset, length, genStamp, volume, conf, remoteFS); } } else { - info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(), - fileRegion.getPath().toUri(), - fileRegion.getOffset(), - fileRegion.getBlock().getNumBytes(), - fileRegion.getBlock().getGenerationStamp(), - volume, conf, remoteFS); + info = new FinalizedProvidedReplica(fileRegion, volume, conf, remoteFS); } return info; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index ab59fa5..6bbfa91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -148,7 +148,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { this.aliasMap = blockAliasMap; } - public void getVolumeMap(ReplicaMap volumeMap, + void fetchVolumeMap(ReplicaMap volumeMap, RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS) throws IOException { BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); @@ -157,21 +157,19 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { + "; no blocks will be populated"); return; } - Iterator<FileRegion> iter = reader.iterator(); Path blockPrefixPath = new Path(providedVolume.getBaseURI()); - while (iter.hasNext()) { - FileRegion region = iter.next(); + for (FileRegion region : reader) { if (region.getBlockPoolId() != null && region.getBlockPoolId().equals(bpid) && containsBlock(providedVolume.baseURI, - region.getPath().toUri())) { - String blockSuffix = - getSuffix(blockPrefixPath, new Path(region.getPath().toUri())); + region.getProvidedStorageLocation().getPath().toUri())) { + String blockSuffix = getSuffix(blockPrefixPath, + new Path(region.getProvidedStorageLocation().getPath().toUri())); ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED) .setBlockId(region.getBlock().getBlockId()) .setPathPrefix(blockPrefixPath) .setPathSuffix(blockSuffix) - .setOffset(region.getOffset()) + .setOffset(region.getProvidedStorageLocation().getOffset()) .setLength(region.getBlock().getNumBytes()) .setGenerationStamp(region.getBlock().getGenerationStamp()) .setFsVolume(providedVolume) @@ -216,18 +214,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { */ aliasMap.refresh(); BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); - if (reader == null) { - LOG.warn("Got null reader from BlockAliasMap " + aliasMap - + "; no blocks will be populated in scan report"); - return; - } - Iterator<FileRegion> iter = reader.iterator(); - while(iter.hasNext()) { + for (FileRegion region : reader) { reportCompiler.throttle(); - FileRegion region = iter.next(); if (region.getBlockPoolId().equals(bpid)) { report.add(new ScanInfo(region.getBlock().getBlockId(), - providedVolume, region, region.getLength())); + providedVolume, region, + region.getProvidedStorageLocation().getLength())); } } } @@ -522,7 +514,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { throws IOException { LOG.info("Creating volumemap for provided volume " + this); for(ProvidedBlockPoolSlice s : bpSlices.values()) { - s.getVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); + s.fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); } } @@ -539,7 +531,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { void getVolumeMap(String bpid, ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { - getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap, + getProvidedBlockPoolSlice(bpid).fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); } @@ -601,7 +593,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { @Override public LinkedList<ScanInfo> compileReport(String bpid, LinkedList<ScanInfo> report, ReportCompiler reportCompiler) - throws InterruptedException, IOException { + throws InterruptedException, IOException { LOG.info("Compiling report for volume: " + this + " bpid " + bpid); //get the report from the appropriate block pool. if(bpSlices.containsKey(bpid)) { @@ -690,6 +682,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { } @VisibleForTesting + BlockAliasMap<FileRegion> getFileRegionProvider(String bpid) throws + IOException { + return getProvidedBlockPoolSlice(bpid).getBlockAliasMap(); + } + + @VisibleForTesting void setFileRegionProvider(String bpid, BlockAliasMap<FileRegion> blockAliasMap) throws IOException { ProvidedBlockPoolSlice bp = bpSlices.get(bpid); http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 32b873b..993716a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; +import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; @@ -208,6 +210,8 @@ public class NameNode extends ReconfigurableBase implements HdfsConfiguration.init(); } + private InMemoryLevelDBAliasMapServer levelDBAliasMapServer; + /** * Categories of operations supported by the namenode. */ @@ -745,6 +749,20 @@ public class NameNode extends ReconfigurableBase implements startCommonServices(conf); startMetricsLogger(conf); + startAliasMapServerIfNecessary(conf); + } + + private void startAliasMapServerIfNecessary(Configuration conf) + throws IOException { + if (conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, + DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT) + && conf.getBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED_DEFAULT)) { + levelDBAliasMapServer = + new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init); + levelDBAliasMapServer.setConf(conf); + levelDBAliasMapServer.start(); + } } private void initReconfigurableBackoffKey() { @@ -1027,6 +1045,9 @@ public class NameNode extends ReconfigurableBase implements MBeans.unregister(nameNodeStatusBeanName); nameNodeStatusBeanName = null; } + if (levelDBAliasMapServer != null) { + levelDBAliasMapServer.close(); + } } tracer.close(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto new file mode 100644 index 0000000..08f10bb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/AliasMapProtocol.proto @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hdfs.protocol.proto"; +option java_outer_classname = "AliasMapProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.hdfs; + +import "hdfs.proto"; + +message KeyValueProto { + optional BlockProto key = 1; + optional ProvidedStorageLocationProto value = 2; +} + +message WriteRequestProto { + required KeyValueProto keyValuePair = 1; +} + +message WriteResponseProto { +} + +message ReadRequestProto { + required BlockProto key = 1; +} + +message ReadResponseProto { + optional ProvidedStorageLocationProto value = 1; +} + +message ListRequestProto { + optional BlockProto marker = 1; +} + +message ListResponseProto { + repeated KeyValueProto fileRegions = 1; + optional BlockProto nextMarker = 2; +} + +service AliasMapProtocolService { + rpc write(WriteRequestProto) returns(WriteResponseProto); + rpc read(ReadRequestProto) returns(ReadResponseProto); + rpc list(ListRequestProto) returns(ListResponseProto); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 655f9cb..ddc07ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4653,6 +4653,40 @@ </property> <property> + <name>dfs.provided.aliasmap.inmemory.batch-size</name> + <value>500</value> + <description> + The batch size when iterating over the database backing the aliasmap + </description> + </property> + + <property> + <name>dfs.provided.aliasmap.inmemory.dnrpc-address</name> + <value>0.0.0.0:50200</value> + <description> + The address where the aliasmap server will be running + </description> + </property> + + <property> + <name>dfs.provided.aliasmap.inmemory.leveldb.dir</name> + <value>/tmp</value> + <description> + The directory where the leveldb files will be kept + </description> + </property> + + <property> + <name>dfs.provided.aliasmap.inmemory.enabled</name> + <value>false</value> + <description> + Don't use the aliasmap by default. Some tests will fail + because they try to start the namenode twice with the + same parameters if you turn it on. + </description> + </property> + + <property> <name>dfs.provided.aliasmap.text.delimiter</name> <value>,</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java new file mode 100644 index 0000000..6f1ff3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/ITestInMemoryAliasMap.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.aliasmap; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Optional; + +/** + * ITestInMemoryAliasMap is an integration test that writes and reads to + * an AliasMap. This is an integration test because it can't be run in parallel + * like normal unit tests since there is conflict over the port being in use. + */ +public class ITestInMemoryAliasMap { + private InMemoryAliasMap aliasMap; + private File tempDirectory; + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + tempDirectory = Files.createTempDirectory("seagull").toFile(); + conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, + tempDirectory.getAbsolutePath()); + aliasMap = InMemoryAliasMap.init(conf); + } + + @After + public void tearDown() throws Exception { + aliasMap.close(); + FileUtils.deleteDirectory(tempDirectory); + } + + @Test + public void readNotFoundReturnsNothing() throws IOException { + Block block = new Block(42, 43, 44); + + Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt + = aliasMap.read(block); + + assertFalse(actualProvidedStorageLocationOpt.isPresent()); + } + + @Test + public void readWrite() throws Exception { + Block block = new Block(42, 43, 44); + + Path path = new Path("eagle", "mouse"); + long offset = 47; + long length = 48; + int nonceSize = 4; + byte[] nonce = new byte[nonceSize]; + Arrays.fill(nonce, 0, (nonceSize - 1), Byte.parseByte("0011", 2)); + + ProvidedStorageLocation expectedProvidedStorageLocation = + new ProvidedStorageLocation(path, offset, length, nonce); + + aliasMap.write(block, expectedProvidedStorageLocation); + + Optional<ProvidedStorageLocation> actualProvidedStorageLocationOpt + = aliasMap.read(block); + + assertTrue(actualProvidedStorageLocationOpt.isPresent()); + assertEquals(expectedProvidedStorageLocation, + actualProvidedStorageLocationOpt.get()); + + } + + @Test + public void list() throws IOException { + Block block1 = new Block(42, 43, 44); + Block block2 = new Block(43, 44, 45); + Block block3 = new Block(44, 45, 46); + + Path path = new Path("eagle", "mouse"); + int nonceSize = 4; + byte[] nonce = new byte[nonceSize]; + Arrays.fill(nonce, 0, (nonceSize - 1), Byte.parseByte("0011", 2)); + ProvidedStorageLocation expectedProvidedStorageLocation1 = + new ProvidedStorageLocation(path, 47, 48, nonce); + ProvidedStorageLocation expectedProvidedStorageLocation2 = + new ProvidedStorageLocation(path, 48, 49, nonce); + ProvidedStorageLocation expectedProvidedStorageLocation3 = + new ProvidedStorageLocation(path, 49, 50, nonce); + + aliasMap.write(block1, expectedProvidedStorageLocation1); + aliasMap.write(block2, expectedProvidedStorageLocation2); + aliasMap.write(block3, expectedProvidedStorageLocation3); + + InMemoryAliasMap.IterationResult list = aliasMap.list(Optional.empty()); + // we should have 3 results + assertEquals(3, list.getFileRegions().size()); + // no more results expected + assertFalse(list.getNextBlock().isPresent()); + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/36957f0d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/TestInMemoryAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/TestInMemoryAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/TestInMemoryAliasMap.java new file mode 100644 index 0000000..f699055 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/aliasmap/TestInMemoryAliasMap.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.aliasmap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.junit.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * TestInMemoryAliasMap tests the initialization of an AliasMap. Most of the + * rest of the tests are in ITestInMemoryAliasMap since the tests are not + * thread safe (there is competition for the port). + */ +public class TestInMemoryAliasMap { + + @Test + public void testInit() { + String nonExistingDirectory = "non-existing-directory"; + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, + nonExistingDirectory); + + assertThatExceptionOfType(IOException.class) + .isThrownBy(() -> InMemoryAliasMap.init(conf)).withMessage( + InMemoryAliasMap.createPathErrorMessage(nonExistingDirectory)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org