HDFS-11902. [READ] Merge BlockFormatProvider and FileRegionProvider.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79f3d9a3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79f3d9a3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79f3d9a3 Branch: refs/heads/HDFS-9806 Commit: 79f3d9a388fae357ae1be53302ac6d5bcd61e2ee Parents: 33854a7 Author: Virajith Jalaparti <[email protected]> Authored: Fri Nov 3 13:45:56 2017 -0700 Committer: Virajith Jalaparti <[email protected]> Committed: Fri Nov 3 18:05:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 17 +- .../blockmanagement/BlockFormatProvider.java | 91 ---- .../server/blockmanagement/BlockProvider.java | 75 ---- .../blockmanagement/ProvidedStorageMap.java | 63 ++- .../hadoop/hdfs/server/common/BlockFormat.java | 82 ---- .../hdfs/server/common/FileRegionProvider.java | 37 -- .../server/common/TextFileRegionFormat.java | 442 ------------------ .../server/common/TextFileRegionProvider.java | 88 ---- .../common/blockaliasmap/BlockAliasMap.java | 88 ++++ .../impl/TextFileRegionAliasMap.java | 445 +++++++++++++++++++ .../common/blockaliasmap/package-info.java | 27 ++ .../fsdataset/impl/ProvidedVolumeImpl.java | 76 ++-- .../src/main/resources/hdfs-default.xml | 34 +- .../blockmanagement/TestProvidedStorageMap.java | 41 +- .../hdfs/server/common/TestTextBlockFormat.java | 160 ------- .../impl/TestTextBlockAliasMap.java | 161 +++++++ .../fsdataset/impl/TestProvidedImpl.java | 75 ++-- .../hdfs/server/namenode/FileSystemImage.java | 4 +- .../hdfs/server/namenode/ImageWriter.java | 25 +- .../hdfs/server/namenode/NullBlockAliasMap.java | 86 ++++ .../hdfs/server/namenode/NullBlockFormat.java | 87 ---- .../hadoop/hdfs/server/namenode/TreePath.java | 8 +- .../TestNameNodeProvidedImplementation.java | 25 +- 23 files changed, 994 insertions(+), 1243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 87cfa77..3a148aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -331,22 +331,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled"; public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false; - public static final String DFS_NAMENODE_BLOCK_PROVIDER_CLASS = "dfs.namenode.block.provider.class"; - - public static final String DFS_PROVIDER_CLASS = "dfs.provider.class"; public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class"; public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id"; public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED"; - public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = "dfs.provided.blockformat.class"; + public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class"; - public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = "dfs.provided.textprovider.delimiter"; - public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ","; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER = "dfs.provided.aliasmap.text.delimiter"; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT = ","; - public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = "dfs.provided.textprovider.read.path"; - public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = "file:///tmp/blocks.csv"; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH = "dfs.provided.aliasmap.text.read.path"; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT = "file:///tmp/blocks.csv"; - public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = "dfs.provided.textprovider.read.codec"; - public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH = "dfs.provided.textprovider.write.path"; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_CODEC = "dfs.provided.aliasmap.text.codec"; + public static final String DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH = "dfs.provided.aliasmap.text.write.path"; public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java deleted file mode 100644 index 930263d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockFormatProvider.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.BlockAlias; -import org.apache.hadoop.hdfs.server.common.BlockFormat; -import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat; -import org.apache.hadoop.util.ReflectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Loads provided blocks from a {@link BlockFormat}. - */ -public class BlockFormatProvider extends BlockProvider - implements Configurable { - - private Configuration conf; - private BlockFormat<? extends BlockAlias> blockFormat; - public static final Logger LOG = - LoggerFactory.getLogger(BlockFormatProvider.class); - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void setConf(Configuration conf) { - Class<? extends BlockFormat> c = conf.getClass( - DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS, - TextFileRegionFormat.class, BlockFormat.class); - blockFormat = ReflectionUtils.newInstance(c, conf); - LOG.info("Loaded BlockFormat class : " + c.getClass().getName()); - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public Iterator<Block> iterator() { - try { - final BlockFormat.Reader<? extends BlockAlias> reader = - blockFormat.getReader(null); - - return new Iterator<Block>() { - - private final Iterator<? extends BlockAlias> inner = reader.iterator(); - - @Override - public boolean hasNext() { - return inner.hasNext(); - } - - @Override - public Block next() { - return inner.next().getBlock(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } catch (IOException e) { - throw new RuntimeException("Failed to read provided blocks", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java deleted file mode 100644 index 2214868..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockProvider.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.io.IOException; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap.ProvidedBlockList; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.util.RwLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to load provided blocks in the {@link BlockManager}. - */ -public abstract class BlockProvider implements Iterable<Block> { - - private static final Logger LOG = - LoggerFactory.getLogger(ProvidedStorageMap.class); - - private RwLock lock; - private BlockManager bm; - private DatanodeStorageInfo storage; - private boolean hasDNs = false; - - /** - * @param lock the namesystem lock - * @param bm block manager - * @param storage storage for provided blocks - */ - void init(RwLock lock, BlockManager bm, DatanodeStorageInfo storage) { - this.bm = bm; - this.lock = lock; - this.storage = storage; - } - - /** - * start the processing of block report for provided blocks. - * @throws IOException - */ - void start(BlockReportContext context) throws IOException { - assert lock.hasWriteLock() : "Not holding write lock"; - if (hasDNs) { - return; - } - if (storage.getBlockReportCount() == 0) { - LOG.info("Calling process first blk report from storage: " + storage); - // first pass; periodic refresh should call bm.processReport - bm.processFirstBlockReport(storage, new ProvidedBlockList(iterator())); - } else { - bm.processReport(storage, new ProvidedBlockList(iterator()), context); - } - hasDNs = true; - } - - void stop() { - assert lock.hasWriteLock() : "Not holding write lock"; - hasDNs = false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 5717e0c..a848d50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -40,7 +40,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.common.BlockAlias; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.util.RwLock; @@ -61,7 +64,11 @@ public class ProvidedStorageMap { LoggerFactory.getLogger(ProvidedStorageMap.class); // limit to a single provider for now - private final BlockProvider blockProvider; + private RwLock lock; + private BlockManager bm; + private boolean hasDNs = false; + private BlockAliasMap aliasMap; + private final String storageId; private final ProvidedDescriptor providedDescriptor; private final DatanodeStorageInfo providedStorageInfo; @@ -79,7 +86,7 @@ public class ProvidedStorageMap { if (!providedEnabled) { // disable mapping - blockProvider = null; + aliasMap = null; providedDescriptor = null; providedStorageInfo = null; return; @@ -90,15 +97,17 @@ public class ProvidedStorageMap { providedDescriptor = new ProvidedDescriptor(); providedStorageInfo = providedDescriptor.createProvidedStorage(ds); + this.bm = bm; + this.lock = lock; + // load block reader into storage - Class<? extends BlockProvider> fmt = conf.getClass( - DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS, - BlockFormatProvider.class, BlockProvider.class); - - blockProvider = ReflectionUtils.newInstance(fmt, conf); - blockProvider.init(lock, bm, providedStorageInfo); - LOG.info("Loaded block provider class: " + - blockProvider.getClass() + " storage: " + providedStorageInfo); + Class<? extends BlockAliasMap> aliasMapClass = conf.getClass( + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, + TextFileRegionAliasMap.class, BlockAliasMap.class); + aliasMap = ReflectionUtils.newInstance(aliasMapClass, conf); + + LOG.info("Loaded alias map class: " + + aliasMap.getClass() + " storage: " + providedStorageInfo); } /** @@ -114,8 +123,7 @@ public class ProvidedStorageMap { BlockReportContext context) throws IOException { if (providedEnabled && storageId.equals(s.getStorageID())) { if (StorageType.PROVIDED.equals(s.getStorageType())) { - // poll service, initiate - blockProvider.start(context); + processProvidedStorageReport(context); dn.injectStorage(providedStorageInfo); return providedDescriptor.getProvidedStorage(dn, s); } @@ -124,6 +132,26 @@ public class ProvidedStorageMap { return dn.getStorageInfo(s.getStorageID()); } + private void processProvidedStorageReport(BlockReportContext context) + throws IOException { + assert lock.hasWriteLock() : "Not holding write lock"; + if (hasDNs) { + return; + } + if (providedStorageInfo.getBlockReportCount() == 0) { + LOG.info("Calling process first blk report from storage: " + + providedStorageInfo); + // first pass; periodic refresh should call bm.processReport + bm.processFirstBlockReport(providedStorageInfo, + new ProvidedBlockList(aliasMap.getReader(null).iterator())); + } else { + bm.processReport(providedStorageInfo, + new ProvidedBlockList(aliasMap.getReader(null).iterator()), + context); + } + hasDNs = true; + } + @VisibleForTesting public DatanodeStorageInfo getProvidedStorageInfo() { return providedStorageInfo; @@ -137,10 +165,11 @@ public class ProvidedStorageMap { } public void removeDatanode(DatanodeDescriptor dnToRemove) { - if (providedDescriptor != null) { + if (providedEnabled) { + assert lock.hasWriteLock() : "Not holding write lock"; int remainingDatanodes = providedDescriptor.remove(dnToRemove); if (remainingDatanodes == 0) { - blockProvider.stop(); + hasDNs = false; } } } @@ -443,9 +472,9 @@ public class ProvidedStorageMap { */ static class ProvidedBlockList extends BlockListAsLongs { - private final Iterator<Block> inner; + private final Iterator<BlockAlias> inner; - ProvidedBlockList(Iterator<Block> inner) { + ProvidedBlockList(Iterator<BlockAlias> inner) { this.inner = inner; } @@ -454,7 +483,7 @@ public class ProvidedStorageMap { return new Iterator<BlockReportReplica>() { @Override public BlockReportReplica next() { - return new BlockReportReplica(inner.next()); + return new BlockReportReplica(inner.next().getBlock()); } @Override public boolean hasNext() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java deleted file mode 100644 index 66e7fdf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.common; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.hdfs.protocol.Block; - -/** - * An abstract class used to read and write block maps for provided blocks. - */ -public abstract class BlockFormat<T extends BlockAlias> { - - /** - * An abstract class that is used to read {@link BlockAlias}es - * for provided blocks. - */ - public static abstract class Reader<U extends BlockAlias> - implements Iterable<U>, Closeable { - - /** - * reader options. - */ - public interface Options { } - - public abstract U resolve(Block ident) throws IOException; - - } - - /** - * Returns the reader for the provided block map. - * @param opts reader options - * @return {@link Reader} to the block map. - * @throws IOException - */ - public abstract Reader<T> getReader(Reader.Options opts) throws IOException; - - /** - * An abstract class used as a writer for the provided block map. - */ - public static abstract class Writer<U extends BlockAlias> - implements Closeable { - /** - * writer options. - */ - public interface Options { } - - public abstract void store(U token) throws IOException; - - } - - /** - * Returns the writer for the provided block map. - * @param opts writer options. - * @return {@link Writer} to the block map. - * @throws IOException - */ - public abstract Writer<T> getWriter(Writer.Options opts) throws IOException; - - /** - * Refresh based on the underlying block map. - * @throws IOException - */ - public abstract void refresh() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java deleted file mode 100644 index 2e94239..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.common; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; - -/** - * This class is a stub for reading file regions from the block map. - */ -public class FileRegionProvider implements Iterable<FileRegion> { - @Override - public Iterator<FileRegion> iterator() { - return Collections.emptyListIterator(); - } - - public void refresh() throws IOException { - return; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java deleted file mode 100644 index eacd08f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java +++ /dev/null @@ -1,442 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.common; - -import java.io.File; -import java.io.IOException; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Collections; -import java.util.IdentityHashMap; -import java.util.NoSuchElementException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This class is used for block maps stored as text files, - * with a specified delimiter. - */ -public class TextFileRegionFormat - extends BlockFormat<FileRegion> implements Configurable { - - private Configuration conf; - private ReaderOptions readerOpts = TextReader.defaults(); - private WriterOptions writerOpts = TextWriter.defaults(); - - public static final Logger LOG = - LoggerFactory.getLogger(TextFileRegionFormat.class); - @Override - public void setConf(Configuration conf) { - readerOpts.setConf(conf); - writerOpts.setConf(conf); - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public Reader<FileRegion> getReader(Reader.Options opts) - throws IOException { - if (null == opts) { - opts = readerOpts; - } - if (!(opts instanceof ReaderOptions)) { - throw new IllegalArgumentException("Invalid options " + opts.getClass()); - } - ReaderOptions o = (ReaderOptions) opts; - Configuration readerConf = (null == o.getConf()) - ? new Configuration() - : o.getConf(); - return createReader(o.file, o.delim, readerConf); - } - - @VisibleForTesting - TextReader createReader(Path file, String delim, Configuration cfg) - throws IOException { - FileSystem fs = file.getFileSystem(cfg); - if (fs instanceof LocalFileSystem) { - fs = ((LocalFileSystem)fs).getRaw(); - } - CompressionCodecFactory factory = new CompressionCodecFactory(cfg); - CompressionCodec codec = factory.getCodec(file); - return new TextReader(fs, file, codec, delim); - } - - @Override - public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { - if (null == opts) { - opts = writerOpts; - } - if (!(opts instanceof WriterOptions)) { - throw new IllegalArgumentException("Invalid options " + opts.getClass()); - } - WriterOptions o = (WriterOptions) opts; - Configuration cfg = (null == o.getConf()) - ? new Configuration() - : o.getConf(); - if (o.codec != null) { - CompressionCodecFactory factory = new CompressionCodecFactory(cfg); - CompressionCodec codec = factory.getCodecByName(o.codec); - String name = o.file.getName() + codec.getDefaultExtension(); - o.filename(new Path(o.file.getParent(), name)); - return createWriter(o.file, codec, o.delim, cfg); - } - return createWriter(o.file, null, o.delim, conf); - } - - @VisibleForTesting - TextWriter createWriter(Path file, CompressionCodec codec, String delim, - Configuration cfg) throws IOException { - FileSystem fs = file.getFileSystem(cfg); - if (fs instanceof LocalFileSystem) { - fs = ((LocalFileSystem)fs).getRaw(); - } - OutputStream tmp = fs.create(file); - java.io.Writer out = new BufferedWriter(new OutputStreamWriter( - (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8")); - return new TextWriter(out, delim); - } - - /** - * Class specifying reader options for the {@link TextFileRegionFormat}. - */ - public static class ReaderOptions - implements TextReader.Options, Configurable { - - private Configuration conf; - private String delim = - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT; - private Path file = new Path( - new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT) - .toURI().toString()); - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH, - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); - file = new Path(tmpfile); - delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT); - LOG.info("TextFileRegionFormat: read path " + tmpfile.toString()); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public ReaderOptions filename(Path file) { - this.file = file; - return this; - } - - @Override - public ReaderOptions delimiter(String delim) { - this.delim = delim; - return this; - } - } - - /** - * Class specifying writer options for the {@link TextFileRegionFormat}. - */ - public static class WriterOptions - implements TextWriter.Options, Configurable { - - private Configuration conf; - private String codec = null; - private Path file = - new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); - private String delim = - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT; - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - String tmpfile = conf.get( - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString()); - file = new Path(tmpfile); - codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC); - delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER, - DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public WriterOptions filename(Path file) { - this.file = file; - return this; - } - - public String getCodec() { - return codec; - } - - public Path getFile() { - return file; - } - - @Override - public WriterOptions codec(String codec) { - this.codec = codec; - return this; - } - - @Override - public WriterOptions delimiter(String delim) { - this.delim = delim; - return this; - } - - } - - /** - * This class is used as a reader for block maps which - * are stored as delimited text files. - */ - public static class TextReader extends Reader<FileRegion> { - - /** - * Options for {@link TextReader}. - */ - public interface Options extends Reader.Options { - Options filename(Path file); - Options delimiter(String delim); - } - - static ReaderOptions defaults() { - return new ReaderOptions(); - } - - private final Path file; - private final String delim; - private final FileSystem fs; - private final CompressionCodec codec; - private final Map<FRIterator, BufferedReader> iterators; - - protected TextReader(FileSystem fs, Path file, CompressionCodec codec, - String delim) { - this(fs, file, codec, delim, - new IdentityHashMap<FRIterator, BufferedReader>()); - } - - TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim, - Map<FRIterator, BufferedReader> iterators) { - this.fs = fs; - this.file = file; - this.codec = codec; - this.delim = delim; - this.iterators = Collections.synchronizedMap(iterators); - } - - @Override - public FileRegion resolve(Block ident) throws IOException { - // consider layering index w/ composable format - Iterator<FileRegion> i = iterator(); - try { - while (i.hasNext()) { - FileRegion f = i.next(); - if (f.getBlock().equals(ident)) { - return f; - } - } - } finally { - BufferedReader r = iterators.remove(i); - if (r != null) { - // null on last element - r.close(); - } - } - return null; - } - - class FRIterator implements Iterator<FileRegion> { - - private FileRegion pending; - - @Override - public boolean hasNext() { - return pending != null; - } - - @Override - public FileRegion next() { - if (null == pending) { - throw new NoSuchElementException(); - } - FileRegion ret = pending; - try { - pending = nextInternal(this); - } catch (IOException e) { - throw new RuntimeException(e); - } - return ret; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException { - BufferedReader r = iterators.get(i); - if (null == r) { - throw new IllegalStateException(); - } - String line = r.readLine(); - if (null == line) { - iterators.remove(i); - return null; - } - String[] f = line.split(delim); - if (f.length != 6) { - throw new IOException("Invalid line: " + line); - } - return new FileRegion(Long.parseLong(f[0]), new Path(f[1]), - Long.parseLong(f[2]), Long.parseLong(f[3]), f[5], - Long.parseLong(f[4])); - } - - public InputStream createStream() throws IOException { - InputStream i = fs.open(file); - if (codec != null) { - i = codec.createInputStream(i); - } - return i; - } - - @Override - public Iterator<FileRegion> iterator() { - FRIterator i = new FRIterator(); - try { - BufferedReader r = - new BufferedReader(new InputStreamReader(createStream(), "UTF-8")); - iterators.put(i, r); - i.pending = nextInternal(i); - } catch (IOException e) { - iterators.remove(i); - throw new RuntimeException(e); - } - return i; - } - - @Override - public void close() throws IOException { - ArrayList<IOException> ex = new ArrayList<>(); - synchronized (iterators) { - for (Iterator<BufferedReader> i = iterators.values().iterator(); - i.hasNext();) { - try { - BufferedReader r = i.next(); - r.close(); - } catch (IOException e) { - ex.add(e); - } finally { - i.remove(); - } - } - iterators.clear(); - } - if (!ex.isEmpty()) { - throw MultipleIOException.createIOException(ex); - } - } - - } - - /** - * This class is used as a writer for block maps which - * are stored as delimited text files. - */ - public static class TextWriter extends Writer<FileRegion> { - - /** - * Interface for Writer options. - */ - public interface Options extends Writer.Options { - Options codec(String codec); - Options filename(Path file); - Options delimiter(String delim); - } - - public static WriterOptions defaults() { - return new WriterOptions(); - } - - private final String delim; - private final java.io.Writer out; - - public TextWriter(java.io.Writer out, String delim) { - this.out = out; - this.delim = delim; - } - - @Override - public void store(FileRegion token) throws IOException { - out.append(String.valueOf(token.getBlock().getBlockId())).append(delim); - out.append(token.getPath().toString()).append(delim); - out.append(Long.toString(token.getOffset())).append(delim); - out.append(Long.toString(token.getLength())).append(delim); - out.append(Long.toString(token.getGenerationStamp())).append(delim); - out.append(token.getBlockPoolId()).append("\n"); - } - - @Override - public void close() throws IOException { - out.close(); - } - - } - - @Override - public void refresh() throws IOException { - //nothing to do; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java deleted file mode 100644 index 0fa667e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.common; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * This class is used to read file regions from block maps - * specified using delimited text. - */ -public class TextFileRegionProvider - extends FileRegionProvider implements Configurable { - - private Configuration conf; - private BlockFormat<FileRegion> fmt; - - @SuppressWarnings("unchecked") - @Override - public void setConf(Configuration conf) { - fmt = ReflectionUtils.newInstance( - conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS, - TextFileRegionFormat.class, - BlockFormat.class), - conf); - ((Configurable)fmt).setConf(conf); //redundant? - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public Iterator<FileRegion> iterator() { - try { - final BlockFormat.Reader<FileRegion> r = fmt.getReader(null); - return new Iterator<FileRegion>() { - - private final Iterator<FileRegion> inner = r.iterator(); - - @Override - public boolean hasNext() { - return inner.hasNext(); - } - - @Override - public FileRegion next() { - return inner.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } catch (IOException e) { - throw new RuntimeException("Failed to read provided blocks", e); - } - } - - @Override - public void refresh() throws IOException { - fmt.refresh(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java new file mode 100644 index 0000000..d276fb5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/BlockAliasMap.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.blockaliasmap; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.BlockAlias; + +/** + * An abstract class used to read and write block maps for provided blocks. + */ +public abstract class BlockAliasMap<T extends BlockAlias> { + + /** + * An abstract class that is used to read {@link BlockAlias}es + * for provided blocks. + */ + public static abstract class Reader<U extends BlockAlias> + implements Iterable<U>, Closeable { + + /** + * reader options. + */ + public interface Options { } + + /** + * @param ident block to resolve + * @return BlockAlias correspoding to the provided block. + * @throws IOException + */ + public abstract U resolve(Block ident) throws IOException; + + } + + /** + * Returns a reader to the alias map. + * @param opts reader options + * @return {@link Reader} to the alias map. + * @throws IOException + */ + public abstract Reader<T> getReader(Reader.Options opts) throws IOException; + + /** + * An abstract class used as a writer for the provided block map. + */ + public static abstract class Writer<U extends BlockAlias> + implements Closeable { + /** + * writer options. + */ + public interface Options { } + + public abstract void store(U token) throws IOException; + + } + + /** + * Returns the writer for the alias map. + * @param opts writer options. + * @return {@link Writer} to the alias map. + * @throws IOException + */ + public abstract Writer<T> getWriter(Writer.Options opts) throws IOException; + + /** + * Refresh the alias map. + * @throws IOException + */ + public abstract void refresh() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java new file mode 100644 index 0000000..80f48c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TextFileRegionAliasMap.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; + +import java.io.File; +import java.io.IOException; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class is used for block maps stored as text files, + * with a specified delimiter. + */ +public class TextFileRegionAliasMap + extends BlockAliasMap<FileRegion> implements Configurable { + + private Configuration conf; + private ReaderOptions readerOpts = TextReader.defaults(); + private WriterOptions writerOpts = TextWriter.defaults(); + + public static final Logger LOG = + LoggerFactory.getLogger(TextFileRegionAliasMap.class); + @Override + public void setConf(Configuration conf) { + readerOpts.setConf(conf); + writerOpts.setConf(conf); + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public Reader<FileRegion> getReader(Reader.Options opts) + throws IOException { + if (null == opts) { + opts = readerOpts; + } + if (!(opts instanceof ReaderOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + ReaderOptions o = (ReaderOptions) opts; + Configuration readerConf = (null == o.getConf()) + ? new Configuration() + : o.getConf(); + return createReader(o.file, o.delim, readerConf); + } + + @VisibleForTesting + TextReader createReader(Path file, String delim, Configuration cfg) + throws IOException { + FileSystem fs = file.getFileSystem(cfg); + if (fs instanceof LocalFileSystem) { + fs = ((LocalFileSystem)fs).getRaw(); + } + CompressionCodecFactory factory = new CompressionCodecFactory(cfg); + CompressionCodec codec = factory.getCodec(file); + return new TextReader(fs, file, codec, delim); + } + + @Override + public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException { + if (null == opts) { + opts = writerOpts; + } + if (!(opts instanceof WriterOptions)) { + throw new IllegalArgumentException("Invalid options " + opts.getClass()); + } + WriterOptions o = (WriterOptions) opts; + Configuration cfg = (null == o.getConf()) + ? new Configuration() + : o.getConf(); + if (o.codec != null) { + CompressionCodecFactory factory = new CompressionCodecFactory(cfg); + CompressionCodec codec = factory.getCodecByName(o.codec); + String name = o.file.getName() + codec.getDefaultExtension(); + o.filename(new Path(o.file.getParent(), name)); + return createWriter(o.file, codec, o.delim, cfg); + } + return createWriter(o.file, null, o.delim, conf); + } + + @VisibleForTesting + TextWriter createWriter(Path file, CompressionCodec codec, String delim, + Configuration cfg) throws IOException { + FileSystem fs = file.getFileSystem(cfg); + if (fs instanceof LocalFileSystem) { + fs = ((LocalFileSystem)fs).getRaw(); + } + OutputStream tmp = fs.create(file); + java.io.Writer out = new BufferedWriter(new OutputStreamWriter( + (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8")); + return new TextWriter(out, delim); + } + + /** + * Class specifying reader options for the {@link TextFileRegionAliasMap}. + */ + public static class ReaderOptions + implements TextReader.Options, Configurable { + + private Configuration conf; + private String delim = + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT; + private Path file = new Path( + new File(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT).toURI() + .toString()); + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String tmpfile = + conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_PATH, + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); + file = new Path(tmpfile); + delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT); + LOG.info("TextFileRegionAliasMap: read path " + tmpfile.toString()); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public ReaderOptions filename(Path file) { + this.file = file; + return this; + } + + @Override + public ReaderOptions delimiter(String delim) { + this.delim = delim; + return this; + } + } + + /** + * Class specifying writer options for the {@link TextFileRegionAliasMap}. + */ + public static class WriterOptions + implements TextWriter.Options, Configurable { + + private Configuration conf; + private String codec = null; + private Path file = + new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT);; + private String delim = + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String tmpfile = conf.get( + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_PATH, file.toString()); + file = new Path(tmpfile); + codec = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_CODEC); + delim = conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, + DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER_DEFAULT); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public WriterOptions filename(Path file) { + this.file = file; + return this; + } + + public String getCodec() { + return codec; + } + + public Path getFile() { + return file; + } + + @Override + public WriterOptions codec(String codec) { + this.codec = codec; + return this; + } + + @Override + public WriterOptions delimiter(String delim) { + this.delim = delim; + return this; + } + + } + + /** + * This class is used as a reader for block maps which + * are stored as delimited text files. + */ + public static class TextReader extends Reader<FileRegion> { + + /** + * Options for {@link TextReader}. + */ + public interface Options extends Reader.Options { + Options filename(Path file); + Options delimiter(String delim); + } + + static ReaderOptions defaults() { + return new ReaderOptions(); + } + + private final Path file; + private final String delim; + private final FileSystem fs; + private final CompressionCodec codec; + private final Map<FRIterator, BufferedReader> iterators; + + protected TextReader(FileSystem fs, Path file, CompressionCodec codec, + String delim) { + this(fs, file, codec, delim, + new IdentityHashMap<FRIterator, BufferedReader>()); + } + + TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim, + Map<FRIterator, BufferedReader> iterators) { + this.fs = fs; + this.file = file; + this.codec = codec; + this.delim = delim; + this.iterators = Collections.synchronizedMap(iterators); + } + + @Override + public FileRegion resolve(Block ident) throws IOException { + // consider layering index w/ composable format + Iterator<FileRegion> i = iterator(); + try { + while (i.hasNext()) { + FileRegion f = i.next(); + if (f.getBlock().equals(ident)) { + return f; + } + } + } finally { + BufferedReader r = iterators.remove(i); + if (r != null) { + // null on last element + r.close(); + } + } + return null; + } + + class FRIterator implements Iterator<FileRegion> { + + private FileRegion pending; + + @Override + public boolean hasNext() { + return pending != null; + } + + @Override + public FileRegion next() { + if (null == pending) { + throw new NoSuchElementException(); + } + FileRegion ret = pending; + try { + pending = nextInternal(this); + } catch (IOException e) { + throw new RuntimeException(e); + } + return ret; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException { + BufferedReader r = iterators.get(i); + if (null == r) { + throw new IllegalStateException(); + } + String line = r.readLine(); + if (null == line) { + iterators.remove(i); + return null; + } + String[] f = line.split(delim); + if (f.length != 6) { + throw new IOException("Invalid line: " + line); + } + return new FileRegion(Long.parseLong(f[0]), new Path(f[1]), + Long.parseLong(f[2]), Long.parseLong(f[3]), f[5], + Long.parseLong(f[4])); + } + + public InputStream createStream() throws IOException { + InputStream i = fs.open(file); + if (codec != null) { + i = codec.createInputStream(i); + } + return i; + } + + @Override + public Iterator<FileRegion> iterator() { + FRIterator i = new FRIterator(); + try { + BufferedReader r = + new BufferedReader(new InputStreamReader(createStream(), "UTF-8")); + iterators.put(i, r); + i.pending = nextInternal(i); + } catch (IOException e) { + iterators.remove(i); + throw new RuntimeException(e); + } + return i; + } + + @Override + public void close() throws IOException { + ArrayList<IOException> ex = new ArrayList<>(); + synchronized (iterators) { + for (Iterator<BufferedReader> i = iterators.values().iterator(); + i.hasNext();) { + try { + BufferedReader r = i.next(); + r.close(); + } catch (IOException e) { + ex.add(e); + } finally { + i.remove(); + } + } + iterators.clear(); + } + if (!ex.isEmpty()) { + throw MultipleIOException.createIOException(ex); + } + } + + } + + /** + * This class is used as a writer for block maps which + * are stored as delimited text files. + */ + public static class TextWriter extends Writer<FileRegion> { + + /** + * Interface for Writer options. + */ + public interface Options extends Writer.Options { + Options codec(String codec); + Options filename(Path file); + Options delimiter(String delim); + } + + public static WriterOptions defaults() { + return new WriterOptions(); + } + + private final String delim; + private final java.io.Writer out; + + public TextWriter(java.io.Writer out, String delim) { + this.out = out; + this.delim = delim; + } + + @Override + public void store(FileRegion token) throws IOException { + out.append(String.valueOf(token.getBlock().getBlockId())).append(delim); + out.append(token.getPath().toString()).append(delim); + out.append(Long.toString(token.getOffset())).append(delim); + out.append(Long.toString(token.getLength())).append(delim); + out.append(Long.toString(token.getGenerationStamp())).append(delim); + out.append(token.getBlockPoolId()).append("\n"); + } + + @Override + public void close() throws IOException { + out.close(); + } + + } + + @Override + public void refresh() throws IOException { + //nothing to do; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java new file mode 100644 index 0000000..b906791 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ [email protected] [email protected] +package org.apache.hadoop.hdfs.server.common.blockaliasmap; + +/** + * The AliasMap defines mapping of PROVIDED HDFS blocks to data in remote + * storage systems. + */ +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index d1a7015..092672d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -35,9 +35,9 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.FileRegion; -import org.apache.hadoop.hdfs.server.common.FileRegionProvider; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; @@ -68,7 +68,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { static class ProvidedBlockPoolSlice { private ProvidedVolumeImpl providedVolume; - private FileRegionProvider provider; + private BlockAliasMap<FileRegion> aliasMap; private Configuration conf; private String bpid; private ReplicaMap bpVolumeMap; @@ -77,29 +77,35 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { Configuration conf) { this.providedVolume = volume; bpVolumeMap = new ReplicaMap(new AutoCloseableLock()); - Class<? extends FileRegionProvider> fmt = - conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS, - TextFileRegionProvider.class, FileRegionProvider.class); - provider = ReflectionUtils.newInstance(fmt, conf); + Class<? extends BlockAliasMap> fmt = + conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, + TextFileRegionAliasMap.class, BlockAliasMap.class); + aliasMap = ReflectionUtils.newInstance(fmt, conf); this.conf = conf; this.bpid = bpid; bpVolumeMap.initBlockPool(bpid); - LOG.info("Created provider: " + provider.getClass()); + LOG.info("Created alias map using class: " + aliasMap.getClass()); } - FileRegionProvider getFileRegionProvider() { - return provider; + BlockAliasMap<FileRegion> getBlockAliasMap() { + return aliasMap; } @VisibleForTesting - void setFileRegionProvider(FileRegionProvider newProvider) { - this.provider = newProvider; + void setFileRegionProvider(BlockAliasMap<FileRegion> blockAliasMap) { + this.aliasMap = blockAliasMap; } public void getVolumeMap(ReplicaMap volumeMap, RamDiskReplicaTracker ramDiskReplicaMap, FileSystem remoteFS) throws IOException { - Iterator<FileRegion> iter = provider.iterator(); + BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); + if (reader == null) { + LOG.warn("Got null reader from BlockAliasMap " + aliasMap + + "; no blocks will be populated"); + return; + } + Iterator<FileRegion> iter = reader.iterator(); while (iter.hasNext()) { FileRegion region = iter.next(); if (region.getBlockPoolId() != null @@ -140,14 +146,20 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { public void compileReport(LinkedList<ScanInfo> report, ReportCompiler reportCompiler) throws IOException, InterruptedException { - /* refresh the provider and return the list of blocks found. + /* refresh the aliasMap and return the list of blocks found. * the assumption here is that the block ids in the external * block map, after the refresh, are consistent with those * from before the refresh, i.e., for blocks which did not change, * the ids remain the same. */ - provider.refresh(); - Iterator<FileRegion> iter = provider.iterator(); + aliasMap.refresh(); + BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null); + if (reader == null) { + LOG.warn("Got null reader from BlockAliasMap " + aliasMap + + "; no blocks will be populated in scan report"); + return; + } + Iterator<FileRegion> iter = reader.iterator(); while(iter.hasNext()) { reportCompiler.throttle(); FileRegion region = iter.next(); @@ -284,15 +296,15 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { private String bpid; private String name; - private FileRegionProvider provider; + private BlockAliasMap<FileRegion> blockAliasMap; private Iterator<FileRegion> blockIterator; private ProvidedBlockIteratorState state; ProviderBlockIteratorImpl(String bpid, String name, - FileRegionProvider provider) { + BlockAliasMap<FileRegion> blockAliasMap) { this.bpid = bpid; this.name = name; - this.provider = provider; + this.blockAliasMap = blockAliasMap; rewind(); } @@ -330,7 +342,17 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { @Override public void rewind() { - blockIterator = provider.iterator(); + BlockAliasMap.Reader<FileRegion> reader = null; + try { + reader = blockAliasMap.getReader(null); + } catch (IOException e) { + LOG.warn("Exception in getting reader from provided alias map"); + } + if (reader != null) { + blockIterator = reader.iterator(); + } else { + blockIterator = null; + } state = new ProvidedBlockIteratorState(); } @@ -372,14 +394,14 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { @Override public BlockIterator newBlockIterator(String bpid, String name) { return new ProviderBlockIteratorImpl(bpid, name, - bpSlices.get(bpid).getFileRegionProvider()); + bpSlices.get(bpid).getBlockAliasMap()); } @Override public BlockIterator loadBlockIterator(String bpid, String name) throws IOException { ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name, - bpSlices.get(bpid).getFileRegionProvider()); + bpSlices.get(bpid).getBlockAliasMap()); iter.load(); return iter; } @@ -425,8 +447,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { } @VisibleForTesting - FileRegionProvider getFileRegionProvider(String bpid) throws IOException { - return getProvidedBlockPoolSlice(bpid).getFileRegionProvider(); + BlockAliasMap<FileRegion> getBlockFormat(String bpid) throws IOException { + return getProvidedBlockPoolSlice(bpid).getBlockAliasMap(); } @Override @@ -571,12 +593,12 @@ public class ProvidedVolumeImpl extends FsVolumeImpl { } @VisibleForTesting - void setFileRegionProvider(String bpid, FileRegionProvider provider) - throws IOException { + void setFileRegionProvider(String bpid, + BlockAliasMap<FileRegion> blockAliasMap) throws IOException { ProvidedBlockPoolSlice bp = bpSlices.get(bpid); if (bp == null) { throw new IOException("block pool " + bpid + " is not found"); } - bp.setFileRegionProvider(provider); + bp.setFileRegionProvider(blockAliasMap); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 83d5fbb..d62ba17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4609,26 +4609,6 @@ </property> <property> - <name>dfs.namenode.block.provider.class</name> - <value>org.apache.hadoop.hdfs.server.blockmanagement.BlockFormatProvider</value> - <description> - The class that is used to load provided blocks in the Namenode. - </description> - </property> - - <property> - <name>dfs.provider.class</name> - <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value> - <description> - The class that is used to load information about blocks stored in - provided storages. - org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider - is used as the default, which expects the blocks to be specified - using a delimited text file. - </description> - </property> - - <property> <name>dfs.provided.df.class</name> <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value> <description> @@ -4645,12 +4625,12 @@ </property> <property> - <name>dfs.provided.blockformat.class</name> - <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value> + <name>dfs.provided.aliasmap.class</name> + <value>org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap</value> <description> The class that is used to specify the input format of the blocks on provided storages. The default is - org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses + org.apache.hadoop.hdfs.server.common.TextFileRegionAliasMap which uses file regions to describe blocks. The file regions are specified as a delimited text file. Each file region is a 6-tuple containing the block id, remote file path, offset into file, length of block, the @@ -4660,7 +4640,7 @@ </property> <property> - <name>dfs.provided.textprovider.delimiter</name> + <name>dfs.provided.aliasmap.text.delimiter</name> <value>,</value> <description> The delimiter used when the provided block map is specified as @@ -4669,7 +4649,7 @@ </property> <property> - <name>dfs.provided.textprovider.read.path</name> + <name>dfs.provided.aliasmap.text.read.path</name> <value></value> <description> The path specifying the provided block map as a text file, specified as @@ -4678,7 +4658,7 @@ </property> <property> - <name>dfs.provided.textprovider.read.codec</name> + <name>dfs.provided.aliasmap.text.codec</name> <value></value> <description> The codec used to de-compress the provided block map. @@ -4686,7 +4666,7 @@ </property> <property> - <name>dfs.provided.textprovider.write.path</name> + <name>dfs.provided.aliasmap.text.write.path</name> <value></value> <description> The path to which the provided block map should be written as a text http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java index 2296c82..89741b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestProvidedStorageMap.java @@ -17,20 +17,19 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestProvidedImpl; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.util.RwLock; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.Iterator; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -47,37 +46,6 @@ public class TestProvidedStorageMap { private RwLock nameSystemLock; private String providedStorageID; - static class TestBlockProvider extends BlockProvider - implements Configurable { - - @Override - public void setConf(Configuration conf) { - } - - @Override - public Configuration getConf() { - return null; - } - - @Override - public Iterator<Block> iterator() { - return new Iterator<Block>() { - @Override - public boolean hasNext() { - return false; - } - @Override - public Block next() { - return null; - } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - } - @Before public void setup() { providedStorageID = DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT; @@ -85,8 +53,9 @@ public class TestProvidedStorageMap { conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID, providedStorageID); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true); - conf.setClass(DFSConfigKeys.DFS_NAMENODE_BLOCK_PROVIDER_CLASS, - TestBlockProvider.class, BlockProvider.class); + conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, + TestProvidedImpl.TestFileRegionBlockAliasMap.class, + BlockAliasMap.class); bm = mock(BlockManager.class); nameSystemLock = mock(RwLock.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java deleted file mode 100644 index eaaac22..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.common; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStreamWriter; -import java.util.Iterator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.compress.CompressionCodec; - -import org.junit.Test; -import static org.junit.Assert.*; - -/** - * Test for the text based block format for provided block maps. - */ -public class TestTextBlockFormat { - - static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt"); - - void check(TextWriter.Options opts, final Path vp, - final Class<? extends CompressionCodec> vc) throws IOException { - TextFileRegionFormat mFmt = new TextFileRegionFormat() { - @Override - public TextWriter createWriter(Path file, CompressionCodec codec, - String delim, Configuration conf) throws IOException { - assertEquals(vp, file); - if (null == vc) { - assertNull(codec); - } else { - assertEquals(vc, codec.getClass()); - } - return null; // ignored - } - }; - mFmt.getWriter(opts); - } - - @Test - public void testWriterOptions() throws Exception { - TextWriter.Options opts = TextWriter.defaults(); - assertTrue(opts instanceof WriterOptions); - WriterOptions wopts = (WriterOptions) opts; - Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT); - assertEquals(def, wopts.getFile()); - assertNull(wopts.getCodec()); - - opts.filename(OUTFILE); - check(opts, OUTFILE, null); - - opts.filename(OUTFILE); - opts.codec("gzip"); - Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz"); - check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class); - - } - - @Test - public void testCSVReadWrite() throws Exception { - final DataOutputBuffer out = new DataOutputBuffer(); - FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); - FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); - FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); - try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) { - csv.store(r1); - csv.store(r2); - csv.store(r3); - } - Iterator<FileRegion> i3; - try (TextReader csv = new TextReader(null, null, null, ",") { - @Override - public InputStream createStream() { - DataInputBuffer in = new DataInputBuffer(); - in.reset(out.getData(), 0, out.getLength()); - return in; - }}) { - Iterator<FileRegion> i1 = csv.iterator(); - assertEquals(r1, i1.next()); - Iterator<FileRegion> i2 = csv.iterator(); - assertEquals(r1, i2.next()); - assertEquals(r2, i2.next()); - assertEquals(r3, i2.next()); - assertEquals(r2, i1.next()); - assertEquals(r3, i1.next()); - - assertFalse(i1.hasNext()); - assertFalse(i2.hasNext()); - i3 = csv.iterator(); - } - try { - i3.next(); - } catch (IllegalStateException e) { - return; - } - fail("Invalid iterator"); - } - - @Test - public void testCSVReadWriteTsv() throws Exception { - final DataOutputBuffer out = new DataOutputBuffer(); - FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); - FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); - FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); - try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) { - csv.store(r1); - csv.store(r2); - csv.store(r3); - } - Iterator<FileRegion> i3; - try (TextReader csv = new TextReader(null, null, null, "\t") { - @Override - public InputStream createStream() { - DataInputBuffer in = new DataInputBuffer(); - in.reset(out.getData(), 0, out.getLength()); - return in; - }}) { - Iterator<FileRegion> i1 = csv.iterator(); - assertEquals(r1, i1.next()); - Iterator<FileRegion> i2 = csv.iterator(); - assertEquals(r1, i2.next()); - assertEquals(r2, i2.next()); - assertEquals(r3, i2.next()); - assertEquals(r2, i1.next()); - assertEquals(r3, i1.next()); - - assertFalse(i1.hasNext()); - assertFalse(i2.hasNext()); - i3 = csv.iterator(); - } - try { - i3.next(); - } catch (IllegalStateException e) { - return; - } - fail("Invalid iterator"); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f3d9a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java new file mode 100644 index 0000000..79308a3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestTextBlockAliasMap.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common.blockaliasmap.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.*; +import org.apache.hadoop.hdfs.server.common.FileRegion; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.CompressionCodec; + +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test for the text based block format for provided block maps. + */ +public class TestTextBlockAliasMap { + + static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt"); + + void check(TextWriter.Options opts, final Path vp, + final Class<? extends CompressionCodec> vc) throws IOException { + TextFileRegionAliasMap mFmt = new TextFileRegionAliasMap() { + @Override + public TextWriter createWriter(Path file, CompressionCodec codec, + String delim, Configuration conf) throws IOException { + assertEquals(vp, file); + if (null == vc) { + assertNull(codec); + } else { + assertEquals(vc, codec.getClass()); + } + return null; // ignored + } + }; + mFmt.getWriter(opts); + } + + @Test + public void testWriterOptions() throws Exception { + TextWriter.Options opts = TextWriter.defaults(); + assertTrue(opts instanceof WriterOptions); + WriterOptions wopts = (WriterOptions) opts; + Path def = new Path(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_PATH_DEFAULT); + assertEquals(def, wopts.getFile()); + assertNull(wopts.getCodec()); + + opts.filename(OUTFILE); + check(opts, OUTFILE, null); + + opts.filename(OUTFILE); + opts.codec("gzip"); + Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz"); + check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class); + + } + + @Test + public void testCSVReadWrite() throws Exception { + final DataOutputBuffer out = new DataOutputBuffer(); + FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); + FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); + FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); + try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) { + csv.store(r1); + csv.store(r2); + csv.store(r3); + } + Iterator<FileRegion> i3; + try (TextReader csv = new TextReader(null, null, null, ",") { + @Override + public InputStream createStream() { + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), 0, out.getLength()); + return in; + }}) { + Iterator<FileRegion> i1 = csv.iterator(); + assertEquals(r1, i1.next()); + Iterator<FileRegion> i2 = csv.iterator(); + assertEquals(r1, i2.next()); + assertEquals(r2, i2.next()); + assertEquals(r3, i2.next()); + assertEquals(r2, i1.next()); + assertEquals(r3, i1.next()); + + assertFalse(i1.hasNext()); + assertFalse(i2.hasNext()); + i3 = csv.iterator(); + } + try { + i3.next(); + } catch (IllegalStateException e) { + return; + } + fail("Invalid iterator"); + } + + @Test + public void testCSVReadWriteTsv() throws Exception { + final DataOutputBuffer out = new DataOutputBuffer(); + FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024); + FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024); + FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512); + try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) { + csv.store(r1); + csv.store(r2); + csv.store(r3); + } + Iterator<FileRegion> i3; + try (TextReader csv = new TextReader(null, null, null, "\t") { + @Override + public InputStream createStream() { + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), 0, out.getLength()); + return in; + }}) { + Iterator<FileRegion> i1 = csv.iterator(); + assertEquals(r1, i1.next()); + Iterator<FileRegion> i2 = csv.iterator(); + assertEquals(r1, i2.next()); + assertEquals(r2, i2.next()); + assertEquals(r3, i2.next()); + assertEquals(r2, i1.next()); + assertEquals(r3, i1.next()); + + assertFalse(i1.hasNext()); + assertFalse(i2.hasNext()); + i3 = csv.iterator(); + } + try { + i3.next(); + } catch (IllegalStateException e) { + return; + } + fail("Invalid iterator"); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
