This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 40cbc60 Inline inner Rfile classes and interfaces (#487) 40cbc60 is described below commit 40cbc60ed7fcb527b1fa57d935baa25d8d47eae4 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri May 18 17:14:32 2018 -0400 Inline inner Rfile classes and interfaces (#487) * Eliminated CachableBlockFile.Writer by inlining BCFile.Writer * Made BCFile close its OutputStream * Eliminated PositionedOuputs wrapping and PositionedOuput interface * Made RateLimitedOutputStream extend DataOutputStream and take FSDataOutputStream --- .../file/blockfile/impl/CachableBlockFile.java | 63 --------------- .../accumulo/core/file/rfile/MultiLevelIndex.java | 4 +- .../org/apache/accumulo/core/file/rfile/RFile.java | 20 ++--- .../accumulo/core/file/rfile/RFileOperations.java | 7 +- .../accumulo/core/file/rfile/SplitLarge.java | 5 +- .../accumulo/core/file/rfile/bcfile/BCFile.java | 93 ++++++++-------------- .../file/streams/PositionedDataOutputStream.java | 37 --------- .../core/file/streams/PositionedOutput.java | 27 ------- .../core/file/streams/PositionedOutputs.java | 73 ----------------- .../core/file/streams/RateLimitedOutputStream.java | 14 ++-- .../core/file/rfile/CreateCompatTestFile.java | 6 +- .../core/file/rfile/MultiLevelIndexTest.java | 5 +- .../core/file/rfile/MultiThreadedRFileTest.java | 5 +- .../apache/accumulo/core/file/rfile/RFileTest.java | 5 +- .../file/streams/RateLimitedOutputStreamTest.java | 13 +-- 15 files changed, 70 insertions(+), 307 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 43e9730..de3d88d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.UncheckedIOException; import java.util.Collections; import java.util.Map; @@ -35,9 +34,7 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable; import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; -import org.apache.accumulo.core.file.streams.PositionedOutput; import org.apache.accumulo.core.file.streams.RateLimitedInputStream; -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -58,57 +55,6 @@ public class CachableBlockFile { private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class); - public static class Writer implements Closeable { - private BCFile.Writer _bc; - private BCFile.Writer.BlockAppender _bw; - private final PositionedOutput fsout; - private long length = 0; - - public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter, - Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { - this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf, - accumuloConfiguration); - } - - public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout, - String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) - throws IOException { - this.fsout = fsout; - init(fsout, compressAlgor, conf, accumuloConfiguration); - } - - private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout, - String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) - throws IOException { - _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration); - } - - public BCFile.Writer.BlockAppender prepareMetaBlock(String name) throws IOException { - _bw = _bc.prepareMetaBlock(name); - return _bw; - } - - public BCFile.Writer.BlockAppender prepareDataBlock() throws IOException { - _bw = _bc.prepareDataBlock(); - return _bw; - } - - @Override - public void close() throws IOException { - - _bw.close(); - _bc.close(); - - length = this.fsout.position(); - ((OutputStream) this.fsout).close(); - } - - public long getLength() throws IOException { - return length; - } - - } - private static interface IoeSupplier<T> { T get() throws IOException; } @@ -482,15 +428,6 @@ public class CachableBlockFile { indexable = true; } - /** - * It is intended that the caller of this method will close the stream we also only intend that - * this be called once per BlockRead. This method is provide for methods up stream that expect - * to receive a DataInputStream object. - */ - public DataInputStream getStream() { - return this; - } - public void seek(int position) { seekableInput.seek(position); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java index 992a3ec..a71c79e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java @@ -507,9 +507,9 @@ public class MultiLevelIndex { private boolean addedLast = false; - private CachableBlockFile.Writer blockFileWriter; + private BCFile.Writer blockFileWriter; - Writer(CachableBlockFile.Writer blockFileWriter, int maxBlockSize) { + Writer(BCFile.Writer blockFileWriter, int maxBlockSize) { this.blockFileWriter = blockFileWriter; this.threshold = maxBlockSize; levels = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 7247b60..0c85066 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry; import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry; import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator; import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender; import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist; import org.apache.accumulo.core.iterators.IterationInterruptedException; @@ -163,8 +164,7 @@ public class RFile { this.version = version; } - public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, - CachableBlockFile.Writer bfw) { + public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BCFile.Writer bfw) { isDefaultLG = true; columnFamilies = new HashMap<>(); previousColumnFamilies = pcf; @@ -174,7 +174,7 @@ public class RFile { } public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize, - CachableBlockFile.Writer bfw) { + BCFile.Writer bfw) { this.name = name; isDefaultLG = false; columnFamilies = new HashMap<>(); @@ -422,7 +422,7 @@ public class RFile { private static class LocalityGroupWriter { - private CachableBlockFile.Writer fileWriter; + private BCFile.Writer fileWriter; private BlockAppender blockWriter; private final long blockSize; @@ -441,7 +441,7 @@ public class RFile { private RollingStats keyLenStats = new RollingStats(2017); private double averageKeySize = 0; - LocalityGroupWriter(CachableBlockFile.Writer fileWriter, long blockSize, long maxBlockSize, + LocalityGroupWriter(BCFile.Writer fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) { this.fileWriter = fileWriter; this.blockSize = blockSize; @@ -552,7 +552,7 @@ public class RFile { public static final int MAX_CF_IN_DLG = 1000; private static final double MAX_BLOCK_MULTIPLIER = 1.1; - private CachableBlockFile.Writer fileWriter; + private BCFile.Writer fileWriter; private final long blockSize; private final long maxBlockSize; @@ -575,12 +575,12 @@ public class RFile { private SamplerConfigurationImpl samplerConfig; private Sampler sampler; - public Writer(CachableBlockFile.Writer bfw, int blockSize) throws IOException { + public Writer(BCFile.Writer bfw, int blockSize) throws IOException { this(bfw, blockSize, (int) DefaultConfiguration.getInstance() .getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null); } - public Writer(CachableBlockFile.Writer bfw, int blockSize, int indexBlockSize, + public Writer(BCFile.Writer bfw, int blockSize, int indexBlockSize, SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException { this.blockSize = blockSize; this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER); @@ -662,7 +662,7 @@ public class RFile { public DataOutputStream createMetaStore(String name) throws IOException { closeData(); - return (DataOutputStream) fileWriter.prepareMetaBlock(name); + return fileWriter.prepareMetaBlock(name); } private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) @@ -1340,7 +1340,7 @@ public class RFile { @Override public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException { try { - return this.reader.getMetaBlock(name).getStream(); + return this.reader.getMetaBlock(name); } catch (MetaBlockDoesNotExist e) { throw new NoSuchMetaStoreException("name = " + name, e); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 0054db2..5d8705e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -30,7 +30,7 @@ import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.hadoop.conf.Configuration; @@ -128,9 +128,8 @@ public class RFileOperations extends FileOperations { outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block); } - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer( - new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf, - acuconf); + BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression, + conf, acuconf); return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java index c3663cd..48f2873 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.rfile.RFile.Writer; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -71,10 +72,10 @@ public class SplitLarge { int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE); try ( Writer small = new RFile.Writer( - new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf), + new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, aconf), blockSize); Writer large = new RFile.Writer( - new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf), + new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, aconf), blockSize)) { small.startDefaultLocalityGroup(); large.startDefaultLocalityGroup(); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index f367ed0..9abbef6 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -42,16 +42,17 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version; import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream; -import org.apache.accumulo.core.file.streams.PositionedDataOutputStream; -import org.apache.accumulo.core.file.streams.PositionedOutput; +import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.file.streams.SeekableDataInputStream; import org.apache.accumulo.core.security.crypto.CryptoModule; import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.compress.Compressor; @@ -91,7 +92,7 @@ public final class BCFile { * BCFile writer, the entry point for creating a new BCFile. */ static public class Writer implements Closeable { - private final PositionedDataOutputStream out; + private final RateLimitedOutputStream out; private final Configuration conf; private final CryptoModule cryptoModule; private BCFileCryptoModuleParameters cryptoParams; @@ -106,23 +107,10 @@ public final class BCFile { long errorCount = 0; // reusable buffers. private BytesWritable fsOutputBuffer; + private long length = 0; - /** - * Call-back interface to register a block after a block is closed. - */ - private interface BlockRegister { - /** - * Register a block that is fully closed. - * - * @param raw - * The size of block in terms of uncompressed bytes. - * @param offsetStart - * The start offset of the block. - * @param offsetEnd - * One byte after the end of the block. Compressed block size is offsetEnd - - * offsetStart. - */ - void register(long raw, long offsetStart, long offsetEnd); + public long getLength() { + return this.length; } /** @@ -132,7 +120,7 @@ public final class BCFile { private final Algorithm compressAlgo; private Compressor compressor; // !null only if using native // Hadoop compression - private final PositionedDataOutputStream fsOut; + private final RateLimitedOutputStream fsOut; private final OutputStream cipherOut; private final long posStart; private final SimpleBufferedOutputStream fsBufferedOutput; @@ -144,7 +132,7 @@ public final class BCFile { * @param cryptoModule * the module to use to obtain cryptographic streams */ - public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut, + public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule, CryptoModuleParameters cryptoParams) throws IOException { this.compressAlgo = compressionAlgo; @@ -269,21 +257,27 @@ public final class BCFile { * */ public class BlockAppender extends DataOutputStream { - private final BlockRegister blockRegister; + private final MetaBlockRegister metaBlockRegister; private final WBlockState wBlkState; private boolean closed = false; /** * Constructor * - * @param register + * @param metaBlockRegister * the block register, which is called when the block is closed. * @param wbs * The writable compression block state. */ - BlockAppender(BlockRegister register, WBlockState wbs) { + BlockAppender(MetaBlockRegister metaBlockRegister, WBlockState wbs) { super(wbs.getOutputStream()); - this.blockRegister = register; + this.metaBlockRegister = metaBlockRegister; + this.wBlkState = wbs; + } + + BlockAppender(WBlockState wbs) { + super(wbs.getOutputStream()); + this.metaBlockRegister = null; this.wBlkState = wbs; } @@ -334,7 +328,9 @@ public final class BCFile { try { ++errorCount; wBlkState.finish(); - blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos()); + if (metaBlockRegister != null) + metaBlockRegister.register(getRawSize(), wBlkState.getStartPos(), + wBlkState.getCurrentPos()); --errorCount; } finally { closed = true; @@ -352,16 +348,15 @@ public final class BCFile { * Name of the compression algorithm, which will be used for all data blocks. * @see Compression#getSupportedAlgorithms */ - public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout, - String compressionName, Configuration conf, boolean trackDataBlocks, - AccumuloConfiguration accumuloConfiguration) throws IOException { - if (fout.position() != 0) { + public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName, + Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { + if (fout.getPos() != 0) { throw new IOException("Output file not at zero offset."); } - this.out = new PositionedDataOutputStream(fout); + this.out = new RateLimitedOutputStream(fout, writeLimiter); this.conf = conf; - dataIndex = new DataIndex(compressionName, trackDataBlocks); + dataIndex = new DataIndex(compressionName); metaIndex = new MetaIndex(); fsOutputBuffer = new BytesWritable(); Magic.write(this.out); @@ -424,6 +419,8 @@ public final class BCFile { Magic.write(out); out.flush(); + length = out.position(); + out.close(); } } finally { closed = true; @@ -485,11 +482,9 @@ public final class BCFile { throw new IllegalStateException("Cannot create Data Block after Meta Blocks."); } - DataBlockRegister dbr = new DataBlockRegister(); - WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf, cryptoModule, cryptoParams); - BlockAppender ba = new BlockAppender(dbr, wbs); + BlockAppender ba = new BlockAppender(wbs); blkInProgress = true; return ba; } @@ -497,7 +492,7 @@ public final class BCFile { /** * Callback to make sure a meta block is added to the internal list when its stream is closed. */ - private class MetaBlockRegister implements BlockRegister { + private class MetaBlockRegister { private final String name; private final Algorithm compressAlgo; @@ -506,27 +501,11 @@ public final class BCFile { this.compressAlgo = compressAlgo; } - @Override public void register(long raw, long begin, long end) { metaIndex.addEntry( new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw))); } } - - /** - * Callback to make sure a data block is added to the internal list when it's being closed. - * - */ - private class DataBlockRegister implements BlockRegister { - DataBlockRegister() { - // do nothing - } - - @Override - public void register(long raw, long begin, long end) { - dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw)); - } - } } // sha256 of some random data @@ -1083,8 +1062,6 @@ public final class BCFile { // and raw size private final ArrayList<BlockRegion> listRegions; - private boolean trackBlocks; - // for read, deserialized from a file public DataIndex(DataInput in) throws IOException { defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in)); @@ -1099,8 +1076,7 @@ public final class BCFile { } // for write - public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) { - this.trackBlocks = trackBlocks; + public DataIndex(String defaultCompressionAlgorithmName) { this.defaultCompressionAlgorithm = Compression .getCompressionAlgorithmByName(defaultCompressionAlgorithmName); listRegions = new ArrayList<>(); @@ -1114,11 +1090,6 @@ public final class BCFile { return listRegions; } - public void addBlockRegion(BlockRegion region) { - if (trackBlocks) - listRegions.add(region); - } - public void write(DataOutput out) throws IOException { Utils.writeString(out, defaultCompressionAlgorithm.getName()); diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java deleted file mode 100644 index 419e6b3..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.streams; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -/** - * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput} - * {@code DataOutputStream} - */ -public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput { - public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream( - StreamType type) { - super(type); - } - - @Override - public long position() throws IOException { - return ((PositionedOutput) out).position(); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java deleted file mode 100644 index aa3122d..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.streams; - -import java.io.IOException; - -/** - * For any byte sink (but especially OutputStream), the ability to report how many bytes have been - * sunk. - */ -public interface PositionedOutput { - long position() throws IOException; -} diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java deleted file mode 100644 index 403c1a2..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.streams; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Objects; - -import org.apache.hadoop.fs.FSDataOutputStream; - -/** - * Utility functions for {@link PositionedOutput}. - */ -public class PositionedOutputs { - private PositionedOutputs() {} - - /** - * Convert an {@code OutputStream} into an {@code OutputStream} implementing - * {@link PositionedOutput}. - */ - public static PositionedOutputStream wrap(final OutputStream fout) { - Objects.requireNonNull(fout); - if (fout instanceof FSDataOutputStream) { - return new PositionedOutputStream(fout) { - @Override - public long position() throws IOException { - return ((FSDataOutputStream) fout).getPos(); - } - }; - } else if (fout instanceof PositionedOutput) { - return new PositionedOutputStream(fout) { - @Override - public long position() throws IOException { - return ((PositionedOutput) fout).position(); - } - }; - } else { - return new PositionedOutputStream(fout) { - @Override - public long position() throws IOException { - throw new UnsupportedOperationException("Underlying stream does not support position()"); - } - }; - } - } - - private static abstract class PositionedOutputStream extends FilterOutputStream - implements PositionedOutput { - public PositionedOutputStream(OutputStream stream) { - super(stream); - } - - @Override - public void write(byte[] data, int off, int len) throws IOException { - out.write(data, off, len); - } - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java index 417b89c..b83b898 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java @@ -16,21 +16,22 @@ */ package org.apache.accumulo.core.file.streams; -import java.io.FilterOutputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.hadoop.fs.FSDataOutputStream; /** * A decorator for {@code OutputStream} which limits the rate at which data may be written. + * Underlying OutputStream is a FSDataOutputStream. */ -public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput { +public class RateLimitedOutputStream extends DataOutputStream { private final RateLimiter writeLimiter; - public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) { - super(PositionedOutputs.wrap(wrappedStream)); + public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, RateLimiter writeLimiter) { + super(fsDataOutputStream); this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter; } @@ -51,8 +52,7 @@ public class RateLimitedOutputStream extends FilterOutputStream implements Posit out.close(); } - @Override public long position() throws IOException { - return ((PositionedOutput) out).position(); + return ((FSDataOutputStream) out).getPos(); } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java index 8e09355..086e8b9 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,8 +56,8 @@ public class CreateCompatTestFile { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null, - conf, DefaultConfiguration.getInstance()); + BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf, + DefaultConfiguration.getInstance()); RFile.Writer writer = new RFile.Writer(_cbw, 1000); writer.startNewLocalityGroup("lg1", diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java index 656e7da..dd316a4 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java @@ -31,7 +31,6 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator; import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer; import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream; import org.apache.accumulo.core.file.rfile.bcfile.BCFile; -import org.apache.accumulo.core.file.streams.PositionedOutputs; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -56,8 +55,8 @@ public class MultiLevelIndexTest extends TestCase { AccumuloConfiguration aconf = DefaultConfiguration.getInstance(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", - CachedConfiguration.getInstance(), aconf); + BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", CachedConfiguration.getInstance(), + aconf); BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index ee7b02b..ca19a1f 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -46,7 +46,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; -import org.apache.accumulo.core.file.streams.PositionedOutputs; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -147,8 +147,7 @@ public class MultiThreadedRFileTest { FileSystem fs = FileSystem.newInstance(conf); Path path = new Path("file://" + rfile); dos = fs.create(path, true); - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), - "gz", conf, accumuloConfiguration); + BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration); SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl .newSamplerConfig(accumuloConfiguration); Sampler sampler = null; diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index f968b21..e23fbd7 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -66,7 +66,7 @@ import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; -import org.apache.accumulo.core.file.streams.PositionedOutputs; +import org.apache.accumulo.core.file.rfile.bcfile.BCFile; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.metadata.MetadataTable; @@ -230,8 +230,7 @@ public class RFileTest { public void openWriter(boolean startDLG, int blockSize) throws IOException { baos = new ByteArrayOutputStream(); dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a")); - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), - "gz", conf, accumuloConfiguration); + BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration); SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl .newSamplerConfig(accumuloConfiguration); diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java index 18d229b..fbcc164 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java @@ -16,10 +16,10 @@ */ package org.apache.accumulo.core.file.streams; -import java.io.FilterOutputStream; import java.io.IOException; import java.util.Random; +import org.apache.hadoop.fs.FSDataOutputStream; import org.junit.Assert; import org.junit.Test; @@ -45,14 +45,9 @@ public class RateLimitedOutputStreamTest { Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired()); } - public static class NullOutputStream extends FilterOutputStream implements PositionedOutput { - public NullOutputStream() { - super(new CountingOutputStream(ByteStreams.nullOutputStream())); - } - - @Override - public long position() throws IOException { - return ((CountingOutputStream) out).getCount(); + public static class NullOutputStream extends FSDataOutputStream { + public NullOutputStream() throws IOException { + super(new CountingOutputStream(ByteStreams.nullOutputStream()), null); } } -- To stop receiving notification emails like this one, please contact mmil...@apache.org.