MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7d2f4749 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7d2f4749 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7d2f4749 Branch: refs/heads/devel-3 Commit: 7d2f47491498c6b1c550f70e626dd76ba1db393e Parents: c787461 Author: MalharJenkins <[email protected]> Authored: Mon Nov 23 21:14:41 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Mon Nov 23 22:11:18 2015 -0800 ---------------------------------------------------------------------- HDHTFileAccess.java | 124 ------------- HDHTFileAccessFSImpl.java | 127 ------------- .../lib/fileaccess/DTFileReader.java | 112 ++++++++++++ .../datatorrent/lib/fileaccess/FileAccess.java | 129 ++++++++++++++ .../lib/fileaccess/FileAccessFSImpl.java | 130 ++++++++++++++ .../datatorrent/lib/fileaccess/TFileImpl.java | 178 +++++++++++++++++++ .../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++ .../datatorrent/lib/fileaccess/TFileWriter.java | 61 +++++++ pom.xml | 2 +- tfile/DTFileReader.java | 111 ------------ tfile/TFileImpl.java | 177 ------------------ tfile/TFileReader.java | 125 ------------- tfile/TFileWriter.java | 60 ------- 13 files changed, 736 insertions(+), 725 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccess.java ---------------------------------------------------------------------- diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java deleted file mode 100644 index 266ba75..0000000 --- a/HDHTFileAccess.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datatorrent.contrib.hdht; - -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.TreeMap; - -import com.datatorrent.netlet.util.Slice; - -/** - * Abstraction for file system and format interaction. - * - * @since 2.0.0 - */ -public interface HDHTFileAccess extends Closeable -{ - void init(); - - DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException; - DataInputStream getInputStream(long bucketKey, String fileName) throws IOException; - - /** - * Atomic file rename. - * @param bucketKey - * @param oldName - * @param newName - * @throws IOException - */ - void rename(long bucketKey, String oldName, String newName) throws IOException; - void delete(long bucketKey, String fileName) throws IOException; - - long getFileSize(long bucketKey, String s) throws IOException; - - /** - * HDHT Data File Format Reader - */ - interface HDSFileReader extends Closeable - { - /** - * Read the entire contents of the underlying file into a TreeMap structure - * @param data - * @throws IOException - */ - //Move to - // void readFully(TreeMap<Slice, Slice> data) throws IOException; - void readFully(TreeMap<Slice, byte[]> data) throws IOException; - - /** - * Repositions the pointer to the beginning of the underlying file. - * @throws IOException - */ - void reset() throws IOException; - - /** - * Searches for a matching key, and positions the pointer before the start of the key. - * @param key Byte array representing the key - * @throws IOException - * @return true if a given key is found - */ - boolean seek(Slice key) throws IOException; - - /** - * Reads next available key/value pair starting from the current pointer position - * into Slice objects and advances pointer to next key. If pointer is at the end - * of the file, false is returned, and Slice objects remains unmodified. - * - * @param key Empty slice object - * @param value Empty slice object - * @return True if key/value were successfully read, false otherwise - * @throws IOException - */ - boolean next(Slice key, Slice value) throws IOException; - - } - - /** - * HDHT Data File Format Writer - */ - interface HDSFileWriter extends Closeable { - /** - * Appends key/value pair to the underlying file. - * @param key - * @param value - * @throws IOException - */ - void append(byte[] key, byte[] value) throws IOException; - - /** - * Returns number of bytes written to the underlying stream. - * @return The bytes written. - * @throws IOException - */ - long getBytesWritten() throws IOException; - } - - /** - * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs. - * work just based on InputStream), construction of the reader is part of the file system abstraction itself. - */ - public HDSFileReader getReader(long bucketKey, String fileName) throws IOException; - - /** - * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs. - * work just based on OutputStream), construction of the writer is part of the file system abstraction itself. - */ - public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccessFSImpl.java ---------------------------------------------------------------------- diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java deleted file mode 100644 index 13dd0ad..0000000 --- a/HDHTFileAccessFSImpl.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datatorrent.contrib.hdht; - -import java.io.IOException; - -import javax.validation.constraints.NotNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.Path; - -import com.datatorrent.netlet.util.DTThrowable; - -/** - * Hadoop file system backed store. - * - * @since 2.0.0 - */ -abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess -{ - @NotNull - private String basePath; - protected transient FileSystem fs; - - public HDHTFileAccessFSImpl() - { - } - - public String getBasePath() - { - return basePath; - } - - public void setBasePath(String path) - { - this.basePath = path; - } - - protected Path getFilePath(long bucketKey, String fileName) { - return new Path(getBucketPath(bucketKey), fileName); - } - - protected Path getBucketPath(long bucketKey) - { - return new Path(basePath, Long.toString(bucketKey)); - } - - @Override - public long getFileSize(long bucketKey, String fileName) throws IOException { - return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen(); - } - - @Override - public void close() throws IOException - { - fs.close(); - } - - @Override - public void init() - { - if (fs == null) { - Path dataFilePath = new Path(basePath); - try { - fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration()); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - } - } - - @Override - public void delete(long bucketKey, String fileName) throws IOException - { - fs.delete(getFilePath(bucketKey, fileName), true); - } - - @Override - public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException - { - Path path = getFilePath(bucketKey, fileName); - return fs.create(path, true); - } - - @Override - public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException - { - return fs.open(getFilePath(bucketKey, fileName)); - } - - @Override - public void rename(long bucketKey, String fromName, String toName) throws IOException - { - FileContext fc = FileContext.getFileContext(fs.getUri()); - Path bucketPath = getBucketPath(bucketKey); - // file context requires absolute path - if (!bucketPath.isAbsolute()) { - bucketPath = new Path(fs.getWorkingDirectory(), bucketPath); - } - fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE); - } - - @Override - public String toString() - { - return this.getClass().getSimpleName() + "[basePath=" + basePath + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java new file mode 100644 index 0000000..cb97520 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.IOException; +import java.util.Arrays; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.file.tfile.DTFile; +import org.apache.hadoop.io.file.tfile.DTFile.Reader; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry; +import org.apache.hadoop.io.file.tfile.TFile; + +import com.datatorrent.netlet.util.Slice; + +/** + * {@link DTFile} wrapper for HDSFileReader + * <br> + * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation + * <br> + * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter" + * + * + * @since 2.0.0 + */ +public class DTFileReader implements FileAccess.FileReader +{ + private final Reader reader; + private final Scanner scanner; + private final FSDataInputStream fsdis; + + public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException + { + this.fsdis = fsdis; + reader = new Reader(fsdis, fileLength, conf); + scanner = reader.createScanner(); + } + + /** + * Unlike the TFile.Reader.close method this will close the wrapped InputStream. + * @see java.io.Closeable#close() + */ + @Override + public void close() throws IOException + { + scanner.close(); + reader.close(); + fsdis.close(); + } + + @Override + public void readFully(TreeMap<Slice, byte[]> data) throws IOException + { + scanner.rewind(); + for (; !scanner.atEnd(); scanner.advance()) { + Entry en = scanner.entry(); + Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength()); + byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength()); + data.put(key, value); + } + + } + + @Override + public void reset() throws IOException + { + scanner.rewind(); + } + + @Override + public boolean seek(Slice key) throws IOException + { + return scanner.seekTo(key.buffer, key.offset, key.length); + } + + @Override + public boolean next(Slice key, Slice value) throws IOException + { + if (scanner.atEnd()) return false; + Entry en = scanner.entry(); + + key.buffer = en.getBlockBuffer(); + key.offset = en.getKeyOffset(); + key.length = en.getKeyLength(); + + value.buffer = en.getBlockBuffer(); + value.offset = en.getValueOffset(); + value.length = en.getValueLength(); + + scanner.advance(); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java new file mode 100644 index 0000000..4b7f6e5 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.TreeMap; + +import com.datatorrent.netlet.util.Slice; + +/** + * Abstraction for file system and format interaction. + * + * @since 2.0.0 + */ +public interface FileAccess extends Closeable +{ + void init(); + + DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException; + + DataInputStream getInputStream(long bucketKey, String fileName) throws IOException; + + /** + * Atomic file rename. + * @param bucketKey + * @param oldName + * @param newName + * @throws IOException + */ + void rename(long bucketKey, String oldName, String newName) throws IOException; + void delete(long bucketKey, String fileName) throws IOException; + + long getFileSize(long bucketKey, String s) throws IOException; + + /** + * Data File Format Reader + */ + interface FileReader extends Closeable + { + /** + * Read the entire contents of the underlying file into a TreeMap structure + * @param data + * @throws IOException + */ + //Move to + // void readFully(TreeMap<Slice, Slice> data) throws IOException; + void readFully(TreeMap<Slice, byte[]> data) throws IOException; + + /** + * Repositions the pointer to the beginning of the underlying file. + * @throws IOException + */ + void reset() throws IOException; + + /** + * Searches for a matching key, and positions the pointer before the start of the key. + * @param key Byte array representing the key + * @throws IOException + * @return true if a given key is found + */ + boolean seek(Slice key) throws IOException; + + /** + * Reads next available key/value pair starting from the current pointer position + * into Slice objects and advances pointer to next key. If pointer is at the end + * of the file, false is returned, and Slice objects remains unmodified. + * + * @param key Empty slice object + * @param value Empty slice object + * @return True if key/value were successfully read, false otherwise + * @throws IOException + */ + boolean next(Slice key, Slice value) throws IOException; + + } + + /** + * Data File Format Writer + */ + interface FileWriter extends Closeable + { + /** + * Appends key/value pair to the underlying file. + * @param key + * @param value + * @throws IOException + */ + void append(byte[] key, byte[] value) throws IOException; + + /** + * Returns number of bytes written to the underlying stream. + * @return The bytes written. + * @throws IOException + */ + long getBytesWritten() throws IOException; + } + + /** + * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs. + * work just based on InputStream), construction of the reader is part of the file system abstraction itself. + */ + public FileReader getReader(long bucketKey, String fileName) throws IOException; + + /** + * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs. + * work just based on OutputStream), construction of the writer is part of the file system abstraction itself. + */ + public FileWriter getWriter(long bucketKey, String fileName) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java new file mode 100644 index 0000000..80a201a --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.IOException; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Hadoop file system backed store. + * + * @since 2.0.0 + */ +public abstract class FileAccessFSImpl implements FileAccess +{ + @NotNull + private String basePath; + protected transient FileSystem fs; + + public FileAccessFSImpl() + { + } + + public String getBasePath() + { + return basePath; + } + + public void setBasePath(String path) + { + this.basePath = path; + } + + protected Path getFilePath(long bucketKey, String fileName) { + return new Path(getBucketPath(bucketKey), fileName); + } + + protected Path getBucketPath(long bucketKey) + { + return new Path(basePath, Long.toString(bucketKey)); + } + + @Override + public long getFileSize(long bucketKey, String fileName) throws IOException { + return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen(); + } + + @Override + public void close() throws IOException + { + fs.close(); + } + + @Override + public void init() + { + if (fs == null) { + Path dataFilePath = new Path(basePath); + try { + fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void delete(long bucketKey, String fileName) throws IOException + { + fs.delete(getFilePath(bucketKey, fileName), true); + } + + @Override + public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException + { + Path path = getFilePath(bucketKey, fileName); + return fs.create(path, true); + } + + @Override + public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException + { + return fs.open(getFilePath(bucketKey, fileName)); + } + + @Override + public void rename(long bucketKey, String fromName, String toName) throws IOException + { + FileContext fc = FileContext.getFileContext(fs.getUri()); + Path bucketPath = getBucketPath(bucketKey); + // file context requires absolute path + if (!bucketPath.isAbsolute()) { + bucketPath = new Path(fs.getWorkingDirectory(), bucketPath); + } + fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE); + } + + @Override + public String toString() + { + return this.getClass().getSimpleName() + "[basePath=" + basePath + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java new file mode 100644 index 0000000..5526832 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.file.tfile.DTFile; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.io.file.tfile.TFile.Reader; +import org.apache.hadoop.io.file.tfile.TFile.Writer; + +/** + * A TFile wrapper with FileAccess API + * <ul> + * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> + * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> + * </ul> + * + * @since 2.0.0 + */ +public abstract class TFileImpl extends FileAccessFSImpl +{ + private int minBlockSize = 64 * 1024; + + private String compressName = TFile.COMPRESSION_NONE; + + private String comparator = "memcmp"; + + private int chunkSize = 1024 * 1024; + + private int inputBufferSize = 256 * 1024; + + private int outputBufferSize = 256 * 1024; + + + private void setupConfig(Configuration conf) + { + conf.set("tfile.io.chunk.size", String.valueOf(chunkSize)); + conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize)); + conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize)); + } + + + @Override + public FileWriter getWriter(long bucketKey, String fileName) throws IOException + { + FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName); + setupConfig(fs.getConf()); + return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf()); + } + + public int getMinBlockSize() + { + return minBlockSize; + } + + + public void setMinBlockSize(int minBlockSize) + { + this.minBlockSize = minBlockSize; + } + + + public String getCompressName() + { + return compressName; + } + + + public void setCompressName(String compressName) + { + this.compressName = compressName; + } + + + public String getComparator() + { + return comparator; + } + + + public void setComparator(String comparator) + { + this.comparator = comparator; + } + + + public int getChunkSize() + { + return chunkSize; + } + + + public void setChunkSize(int chunkSize) + { + this.chunkSize = chunkSize; + } + + + public int getInputBufferSize() + { + return inputBufferSize; + } + + + public void setInputBufferSize(int inputBufferSize) + { + this.inputBufferSize = inputBufferSize; + } + + + public int getOutputBufferSize() + { + return outputBufferSize; + } + + + public void setOutputBufferSize(int outputBufferSize) + { + this.outputBufferSize = outputBufferSize; + } + + /** + * Return {@link TFile} {@link Reader} + * + */ + public static class DefaultTFileImpl extends TFileImpl{ + + @Override + public FileReader getReader(long bucketKey, String fileName) throws IOException + { + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + long fileLength = getFileSize(bucketKey, fileName); + super.setupConfig(fs.getConf()); + return new TFileReader(fsdis, fileLength, fs.getConf()); + } + + } + + + /** + * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader} + * + */ + public static class DTFileImpl extends TFileImpl { + + @Override + public FileReader getReader(long bucketKey, String fileName) throws IOException + { + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + long fileLength = getFileSize(bucketKey, fileName); + super.setupConfig(fs.getConf()); + return new DTFileReader(fsdis, fileLength, fs.getConf()); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java new file mode 100644 index 0000000..8426c3f --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.IOException; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.file.tfile.TFile.Reader; +import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; +import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry; + +import com.datatorrent.netlet.util.Slice; + +/** + * TFileReader + * + * @since 2.0.0 + */ +public class TFileReader implements FileAccess.FileReader +{ + + private final Reader reader; + private final Scanner scanner; + private final FSDataInputStream fsdis; + private boolean closed = false; + + public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException + { + this.fsdis = fsdis; + reader = new Reader(fsdis, fileLength, conf); + scanner = reader.createScanner(); + } + + /** + * Unlike the TFile.Reader.close method this will close the wrapped InputStream. + * @see java.io.Closeable#close() + */ + @Override + public void close() throws IOException + { + closed = true; + scanner.close(); + reader.close(); + fsdis.close(); + } + + @Override + public void readFully(TreeMap<Slice, byte[]> data) throws IOException + { + scanner.rewind(); + for (; !scanner.atEnd(); scanner.advance()) { + Entry en = scanner.entry(); + int klen = en.getKeyLength(); + int vlen = en.getValueLength(); + byte[] key = new byte[klen]; + byte[] value = new byte[vlen]; + en.getKey(key); + en.getValue(value); + data.put(new Slice(key, 0, key.length), value); + } + + } + + @Override + public void reset() throws IOException + { + scanner.rewind(); + } + + @Override + public boolean seek(Slice key) throws IOException + { + try { + return scanner.seekTo(key.buffer, key.offset, key.length); + } catch (NullPointerException ex) { + if (closed) + throw new IOException("Stream was closed"); + else + throw ex; + } + } + + @Override + public boolean next(Slice key, Slice value) throws IOException + { + if (scanner.atEnd()) return false; + Entry en = scanner.entry(); + byte[] rkey = new byte[en.getKeyLength()]; + byte[] rval = new byte[en.getValueLength()]; + en.getKey(rkey); + en.getValue(rval); + + key.buffer = rkey; + key.offset = 0; + key.length = en.getKeyLength(); + + value.buffer = rval; + value.offset = 0; + value.length = en.getValueLength(); + + scanner.advance(); + return true; + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java new file mode 100644 index 0000000..b362987 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.fileaccess; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.file.tfile.TFile.Writer; + +/** + * TFileWriter + * + * @since 2.0.0 + */ +public final class TFileWriter implements FileAccess.FileWriter +{ + private Writer writer; + + private FSDataOutputStream fsdos; + + public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException + { + this.fsdos = stream; + writer = new Writer(stream, minBlockSize, compressName, comparator, conf); + + } + + @Override + public void close() throws IOException + { + writer.close(); + fsdos.close(); + } + + @Override + public void append(byte[] key, byte[] value) throws IOException + { + writer.append(key, value); + } + + @Override + public long getBytesWritten() throws IOException{ return fsdos.getPos(); } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 92466ab..678540d 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>8768</maxAllowedViolations> + <maxAllowedViolations>8789</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/DTFileReader.java ---------------------------------------------------------------------- diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java deleted file mode 100644 index e61d475..0000000 --- a/tfile/DTFileReader.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.datatorrent.contrib.hdht.tfile; - -import java.io.IOException; -import java.util.Arrays; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.file.tfile.DTFile; -import org.apache.hadoop.io.file.tfile.DTFile.Reader; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; -import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry; -import org.apache.hadoop.io.file.tfile.TFile; - -import com.datatorrent.netlet.util.Slice; -import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader; - -/** - * {@link DTFile} wrapper for HDSFileReader - * <br> - * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation - * <br> - * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter" - * - * - * @since 2.0.0 - */ -public class DTFileReader implements HDSFileReader -{ - private final Reader reader; - private final Scanner scanner; - private final FSDataInputStream fsdis; - - public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException - { - this.fsdis = fsdis; - reader = new Reader(fsdis, fileLength, conf); - scanner = reader.createScanner(); - } - - /** - * Unlike the TFile.Reader.close method this will close the wrapped InputStream. - * @see java.io.Closeable#close() - */ - @Override - public void close() throws IOException - { - scanner.close(); - reader.close(); - fsdis.close(); - } - - @Override - public void readFully(TreeMap<Slice, byte[]> data) throws IOException - { - scanner.rewind(); - for (; !scanner.atEnd(); scanner.advance()) { - Entry en = scanner.entry(); - Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength()); - byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength()); - data.put(key, value); - } - - } - - @Override - public void reset() throws IOException - { - scanner.rewind(); - } - - @Override - public boolean seek(Slice key) throws IOException - { - return scanner.seekTo(key.buffer, key.offset, key.length); - } - - @Override - public boolean next(Slice key, Slice value) throws IOException - { - if (scanner.atEnd()) return false; - Entry en = scanner.entry(); - - key.buffer = en.getBlockBuffer(); - key.offset = en.getKeyOffset(); - key.length = en.getKeyLength(); - - value.buffer = en.getBlockBuffer(); - value.offset = en.getValueOffset(); - value.length = en.getValueLength(); - - scanner.advance(); - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileImpl.java ---------------------------------------------------------------------- diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java deleted file mode 100644 index 5dc9464..0000000 --- a/tfile/TFileImpl.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datatorrent.contrib.hdht.tfile; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.file.tfile.DTFile; -import org.apache.hadoop.io.file.tfile.TFile; -import org.apache.hadoop.io.file.tfile.TFile.Reader; -import org.apache.hadoop.io.file.tfile.TFile.Writer; - -import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl; - -/** - * A TFile wrapper with HDHTFileAccess API - * <ul> - * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> - * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> - * </ul> - * - * @since 2.0.0 - */ -public abstract class TFileImpl extends HDHTFileAccessFSImpl -{ - private int minBlockSize = 64 * 1024; - - private String compressName = TFile.COMPRESSION_NONE; - - private String comparator = "memcmp"; - - private int chunkSize = 1024 * 1024; - - private int inputBufferSize = 256 * 1024; - - private int outputBufferSize = 256 * 1024; - - - private void setupConfig(Configuration conf) - { - conf.set("tfile.io.chunk.size", String.valueOf(chunkSize)); - conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize)); - conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize)); - } - - - @Override - public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException - { - FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName); - setupConfig(fs.getConf()); - return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf()); - } - - public int getMinBlockSize() - { - return minBlockSize; - } - - - public void setMinBlockSize(int minBlockSize) - { - this.minBlockSize = minBlockSize; - } - - - public String getCompressName() - { - return compressName; - } - - - public void setCompressName(String compressName) - { - this.compressName = compressName; - } - - - public String getComparator() - { - return comparator; - } - - - public void setComparator(String comparator) - { - this.comparator = comparator; - } - - - public int getChunkSize() - { - return chunkSize; - } - - - public void setChunkSize(int chunkSize) - { - this.chunkSize = chunkSize; - } - - - public int getInputBufferSize() - { - return inputBufferSize; - } - - - public void setInputBufferSize(int inputBufferSize) - { - this.inputBufferSize = inputBufferSize; - } - - - public int getOutputBufferSize() - { - return outputBufferSize; - } - - - public void setOutputBufferSize(int outputBufferSize) - { - this.outputBufferSize = outputBufferSize; - } - - /** - * Return {@link TFile} {@link Reader} - * - */ - public static class DefaultTFileImpl extends TFileImpl{ - - @Override - public HDSFileReader getReader(long bucketKey, String fileName) throws IOException - { - FSDataInputStream fsdis = getInputStream(bucketKey, fileName); - long fileLength = getFileSize(bucketKey, fileName); - super.setupConfig(fs.getConf()); - return new TFileReader(fsdis, fileLength, fs.getConf()); - } - - } - - - /** - * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader} - * - */ - public static class DTFileImpl extends TFileImpl { - - @Override - public HDSFileReader getReader(long bucketKey, String fileName) throws IOException - { - FSDataInputStream fsdis = getInputStream(bucketKey, fileName); - long fileLength = getFileSize(bucketKey, fileName); - super.setupConfig(fs.getConf()); - return new DTFileReader(fsdis, fileLength, fs.getConf()); - } - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileReader.java ---------------------------------------------------------------------- diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java deleted file mode 100644 index 0994666..0000000 --- a/tfile/TFileReader.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datatorrent.contrib.hdht.tfile; - -import java.io.IOException; -import java.util.TreeMap; - -import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.netlet.util.Slice; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.file.tfile.TFile.Reader; -import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; -import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry; - -import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader; - -/** - * TFileReader - * - * @since 2.0.0 - */ -public class TFileReader implements HDSFileReader -{ - - private final Reader reader; - private final Scanner scanner; - private final FSDataInputStream fsdis; - private boolean closed = false; - - public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException - { - this.fsdis = fsdis; - reader = new Reader(fsdis, fileLength, conf); - scanner = reader.createScanner(); - } - - /** - * Unlike the TFile.Reader.close method this will close the wrapped InputStream. - * @see java.io.Closeable#close() - */ - @Override - public void close() throws IOException - { - closed = true; - scanner.close(); - reader.close(); - fsdis.close(); - } - - @Override - public void readFully(TreeMap<Slice, byte[]> data) throws IOException - { - scanner.rewind(); - for (; !scanner.atEnd(); scanner.advance()) { - Entry en = scanner.entry(); - int klen = en.getKeyLength(); - int vlen = en.getValueLength(); - byte[] key = new byte[klen]; - byte[] value = new byte[vlen]; - en.getKey(key); - en.getValue(value); - data.put(new Slice(key, 0, key.length), value); - } - - } - - @Override - public void reset() throws IOException - { - scanner.rewind(); - } - - @Override - public boolean seek(Slice key) throws IOException - { - try { - return scanner.seekTo(key.buffer, key.offset, key.length); - } catch (NullPointerException ex) { - if (closed) - throw new IOException("Stream was closed"); - else - throw ex; - } - } - - @Override - public boolean next(Slice key, Slice value) throws IOException - { - if (scanner.atEnd()) return false; - Entry en = scanner.entry(); - byte[] rkey = new byte[en.getKeyLength()]; - byte[] rval = new byte[en.getValueLength()]; - en.getKey(rkey); - en.getValue(rval); - - key.buffer = rkey; - key.offset = 0; - key.length = en.getKeyLength(); - - value.buffer = rval; - value.offset = 0; - value.length = en.getValueLength(); - - scanner.advance(); - return true; - } - -} - - http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileWriter.java ---------------------------------------------------------------------- diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java deleted file mode 100644 index 549e1b8..0000000 --- a/tfile/TFileWriter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datatorrent.contrib.hdht.tfile; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.file.tfile.TFile.Writer; - -import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter; - -/** - * TFileWriter - * - * @since 2.0.0 - */ -public final class TFileWriter implements HDSFileWriter -{ - private Writer writer; - - private FSDataOutputStream fsdos; - - public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException - { - this.fsdos = stream; - writer = new Writer(stream, minBlockSize, compressName, comparator, conf); - - } - - @Override - public void close() throws IOException - { - writer.close(); - fsdos.close(); - } - - @Override - public void append(byte[] key, byte[] value) throws IOException - { - writer.append(key, value); - } - - @Override - public long getBytesWritten() throws IOException{ return fsdos.getPos(); } - -}
