Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 e24c14c3e -> 333a70733
Rename HDS to HDHT. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4e47d236 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4e47d236 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4e47d236 Branch: refs/heads/devel-3 Commit: 4e47d236cbd8a1ca3c6dd96235cd588a7eb1d2e6 Parents: 217f8db Author: thomas <[email protected]> Authored: Mon Dec 8 15:22:57 2014 -0800 Committer: Chandni Singh <[email protected]> Committed: Mon Nov 23 21:37:01 2015 -0800 ---------------------------------------------------------------------- AbstractSinglePortHDSWriter.java | 194 ++++++++++++++++++++++++++++++++++ HDHTFileAccess.java | 122 +++++++++++++++++++++ HDHTFileAccessFSImpl.java | 125 ++++++++++++++++++++++ tfile/DTFileReader.java | 110 +++++++++++++++++++ tfile/TFileImpl.java | 176 ++++++++++++++++++++++++++++++ tfile/TFileReader.java | 110 +++++++++++++++++++ tfile/TFileWriter.java | 55 ++++++++++ 7 files changed, 892 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/AbstractSinglePortHDSWriter.java ---------------------------------------------------------------------- diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java new file mode 100644 index 0000000..04fa602 --- /dev/null +++ b/AbstractSinglePortHDSWriter.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.common.util.Slice; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; + +/** + * Operator that receives data on port and writes it to the data store. + * Implements partitioning, maps partition key to the store bucket. + * The derived class supplies the codec for partitioning and key-value serialization. + * @param <EVENT> + */ +public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements Partitioner<AbstractSinglePortHDSWriter<EVENT>> +{ + public interface HDSCodec<EVENT> extends StreamCodec<EVENT> + { + byte[] getKeyBytes(EVENT event); + byte[] getValueBytes(EVENT event); + EVENT fromKeyValue(Slice key, byte[] value); + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class); + + protected int partitionMask; + + protected Set<Integer> partitions; + + protected transient HDSCodec<EVENT> codec; + + @Min(1) + private int partitionCount = 1; + + public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>() + { + @Override + public void process(EVENT event) + { + try { + processEvent(event); + } catch (IOException e) { + throw new RuntimeException("Error processing " + event, e); + } + } + + @Override + public StreamCodec<EVENT> getStreamCodec() + { + return getCodec(); + } + }; + + public void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } + + public int getPartitionCount() + { + return partitionCount; + } + + /** + * Storage bucket for the given event. Only one partition can write to a storage bucket and by default it is + * identified by the partition id. + * + * @param event + * @return The bucket key. + */ + protected long getBucketKey(EVENT event) + { + return (codec.getPartition(event) & partitionMask); + } + + protected void processEvent(EVENT event) throws IOException + { + byte[] key = codec.getKeyBytes(event); + byte[] value = codec.getValueBytes(event); + super.put(getBucketKey(event), new Slice(key), value); + } + + abstract protected HDSCodec<EVENT> getCodec(); + + @Override + public void setup(OperatorContext arg0) + { + LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask, this.partitions)); + super.setup(arg0); + try { + this.codec = getCodec(); + // inject the operator reference, if such field exists + // TODO: replace with broader solution + Class<?> cls = this.codec.getClass(); + while (cls != null) { + for (Field field : cls.getDeclaredFields()) { + if (field.getType().isAssignableFrom(this.getClass())) { + field.setAccessible(true); + field.set(this.codec, this); + } + } + cls = cls.getSuperclass(); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create codec", e); + } + } + + @Override + public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> partitions, int incrementalCapacity) + { + boolean isInitialPartition = partitions.iterator().next().getStats() == null; + + if (!isInitialPartition) { + // support for dynamic partitioning requires lineage tracking + LOG.warn("Dynamic partitioning not implemented"); + return partitions; + } + + int totalCount; + + //Get the size of the partition for parallel partitioning + if(incrementalCapacity != 0) { + totalCount = incrementalCapacity; + } + //Do normal partitioning + else { + totalCount = partitionCount; + } + + Kryo lKryo = new Kryo(); + Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); + for (int i = 0; i < totalCount; i++) { + // Kryo.copy fails as it attempts to clone transient fields (input port) + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Output output = new Output(bos); + lKryo.writeObject(output, this); + output.close(); + Input lInput = new Input(bos.toByteArray()); + @SuppressWarnings("unchecked") + AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass()); + newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper)); + } + + // assign the partition keys + DefaultPartition.assignPartitionKeys(newPartitions, input); + + for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) { + PartitionKeys pks = p.getPartitionKeys().get(input); + p.getPartitionedInstance().partitionMask = pks.mask; + p.getPartitionedInstance().partitions = pks.partitions; + } + + return newPartitions; + } + + @Override + public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>> arg0) + { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccess.java ---------------------------------------------------------------------- diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java new file mode 100644 index 0000000..fc3d56f --- /dev/null +++ b/HDHTFileAccess.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.TreeMap; + +import com.datatorrent.common.util.Slice; + +/** + * Abstraction for file system and format interaction. + */ +public interface HDHTFileAccess extends Closeable +{ + void init(); + + DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException; + DataInputStream getInputStream(long bucketKey, String fileName) throws IOException; + + /** + * Atomic file rename. + * @param bucketKey + * @param oldName + * @param newName + * @throws IOException + */ + void rename(long bucketKey, String oldName, String newName) throws IOException; + void delete(long bucketKey, String fileName) throws IOException; + + long getFileSize(long bucketKey, String s) throws IOException; + + /** + * HDHT Data File Format Reader + */ + interface HDSFileReader extends Closeable + { + /** + * Read the entire contents of the underlying file into a TreeMap structure + * @param data + * @throws IOException + */ + //Move to + // void readFully(TreeMap<Slice, Slice> data) throws IOException; + void readFully(TreeMap<Slice, byte[]> data) throws IOException; + + /** + * Repositions the pointer to the beginning of the underlying file. + * @throws IOException + */ + void reset() throws IOException; + + /** + * Searches for a matching key, and positions the pointer before the start of the key. + * @param key Byte array representing the key + * @throws IOException + * @return true if a given key is found + */ + boolean seek(Slice key) throws IOException; + + /** + * Reads next available key/value pair starting from the current pointer position + * into Slice objects and advances pointer to next key. If pointer is at the end + * of the file, false is returned, and Slice objects remains unmodified. + * + * @param key Empty slice object + * @param value Empty slice object + * @return True if key/value were successfully read, false otherwise + * @throws IOException + */ + boolean next(Slice key, Slice value) throws IOException; + + } + + /** + * HDHT Data File Format Writer + */ + interface HDSFileWriter extends Closeable { + /** + * Appends key/value pair to the underlying file. + * @param key + * @param value + * @throws IOException + */ + void append(byte[] key, byte[] value) throws IOException; + + /** + * Returns number of bytes written to the underlying stream. + * @return The bytes written. + * @throws IOException + */ + long getBytesWritten() throws IOException; + } + + /** + * Obtain a reader for the given data file. Since existing file formats may depend on the file system directly (vs. + * work just based on InputStream), construction of the reader is part of the file system abstraction itself. + */ + public HDSFileReader getReader(long bucketKey, String fileName) throws IOException; + + /** + * Obtain a writer for the given data file. Since existing file formats may depend on the file system directly (vs. + * work just based on OutputStream), construction of the writer is part of the file system abstraction itself. + */ + public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccessFSImpl.java ---------------------------------------------------------------------- diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java new file mode 100644 index 0000000..ad9aa05 --- /dev/null +++ b/HDHTFileAccessFSImpl.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht; + +import java.io.IOException; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.common.util.DTThrowable; + +/** + * Hadoop file system backed store. + */ +abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess +{ + @NotNull + private String basePath; + protected transient FileSystem fs; + + public HDHTFileAccessFSImpl() + { + } + + public String getBasePath() + { + return basePath; + } + + public void setBasePath(String path) + { + this.basePath = path; + } + + protected Path getFilePath(long bucketKey, String fileName) { + return new Path(getBucketPath(bucketKey), fileName); + } + + protected Path getBucketPath(long bucketKey) + { + return new Path(basePath, Long.toString(bucketKey)); + } + + @Override + public long getFileSize(long bucketKey, String fileName) throws IOException { + return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen(); + } + + @Override + public void close() throws IOException + { + fs.close(); + } + + @Override + public void init() + { + if (fs == null) { + Path dataFilePath = new Path(basePath); + try { + fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration()); + } catch (IOException e) { + DTThrowable.rethrow(e); + } + } + } + + @Override + public void delete(long bucketKey, String fileName) throws IOException + { + fs.delete(getFilePath(bucketKey, fileName), true); + } + + @Override + public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException + { + Path path = getFilePath(bucketKey, fileName); + return fs.create(path, true); + } + + @Override + public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException + { + return fs.open(getFilePath(bucketKey, fileName)); + } + + @Override + public void rename(long bucketKey, String fromName, String toName) throws IOException + { + FileContext fc = FileContext.getFileContext(fs.getUri()); + Path bucketPath = getBucketPath(bucketKey); + // file context requires absolute path + if (!bucketPath.isAbsolute()) { + bucketPath = new Path(fs.getWorkingDirectory(), bucketPath); + } + fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE); + } + + @Override + public String toString() + { + return this.getClass().getSimpleName() + "[basePath=" + basePath + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/DTFileReader.java ---------------------------------------------------------------------- diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java new file mode 100644 index 0000000..fefadaf --- /dev/null +++ b/tfile/DTFileReader.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.contrib.hdht.tfile; + +import java.io.IOException; +import java.util.Arrays; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.file.tfile.DTFile; +import org.apache.hadoop.io.file.tfile.DTFile.Reader; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner; +import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry; +import org.apache.hadoop.io.file.tfile.TFile; + +import com.datatorrent.common.util.Slice; +import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader; + +/** + * {@link DTFile} wrapper for HDSFileReader + * <br> + * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader} implementation + * <br> + * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}. So there is no corresponding "DTFileWriter" + * + * + */ +public class DTFileReader implements HDSFileReader +{ + private final Reader reader; + private final Scanner scanner; + private final FSDataInputStream fsdis; + + public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException + { + this.fsdis = fsdis; + reader = new Reader(fsdis, fileLength, conf); + scanner = reader.createScanner(); + } + + /** + * Unlike the TFile.Reader.close method this will close the wrapped InputStream. + * @see java.io.Closeable#close() + */ + @Override + public void close() throws IOException + { + scanner.close(); + reader.close(); + fsdis.close(); + } + + @Override + public void readFully(TreeMap<Slice, byte[]> data) throws IOException + { + scanner.rewind(); + for (; !scanner.atEnd(); scanner.advance()) { + Entry en = scanner.entry(); + Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength()); + byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset() + en.getValueLength()); + data.put(key, value); + } + + } + + @Override + public void reset() throws IOException + { + scanner.rewind(); + } + + @Override + public boolean seek(Slice key) throws IOException + { + return scanner.seekTo(key.buffer, key.offset, key.length); + } + + @Override + public boolean next(Slice key, Slice value) throws IOException + { + if (scanner.atEnd()) return false; + Entry en = scanner.entry(); + + key.buffer = en.getBlockBuffer(); + key.offset = en.getKeyOffset(); + key.length = en.getKeyLength(); + + value.buffer = en.getBlockBuffer(); + value.offset = en.getValueOffset(); + value.length = en.getValueLength(); + + scanner.advance(); + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileImpl.java ---------------------------------------------------------------------- diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java new file mode 100644 index 0000000..714a5b1 --- /dev/null +++ b/tfile/TFileImpl.java @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht.tfile; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.file.tfile.DTFile; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.io.file.tfile.TFile.Reader; +import org.apache.hadoop.io.file.tfile.TFile.Writer; + +import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl; + +/** + * A TFile wrapper with HDHTFileAccess API + * <ul> + * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> + * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> + * </ul> + * + */ +public abstract class TFileImpl extends HDHTFileAccessFSImpl +{ + private int minBlockSize = 64 * 1024; + + private String compressName = TFile.COMPRESSION_NONE; + + private String comparator = "memcmp"; + + private int chunkSize = 1024 * 1024; + + private int inputBufferSize = 256 * 1024; + + private int outputBufferSize = 256 * 1024; + + + private void setupConfig(Configuration conf) + { + conf.set("tfile.io.chunk.size", String.valueOf(chunkSize)); + conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize)); + conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize)); + } + + + @Override + public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException + { + FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName); + setupConfig(fs.getConf()); + return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf()); + } + + public int getMinBlockSize() + { + return minBlockSize; + } + + + public void setMinBlockSize(int minBlockSize) + { + this.minBlockSize = minBlockSize; + } + + + public String getCompressName() + { + return compressName; + } + + + public void setCompressName(String compressName) + { + this.compressName = compressName; + } + + + public String getComparator() + { + return comparator; + } + + + public void setComparator(String comparator) + { + this.comparator = comparator; + } + + + public int getChunkSize() + { + return chunkSize; + } + + + public void setChunkSize(int chunkSize) + { + this.chunkSize = chunkSize; + } + + + public int getInputBufferSize() + { + return inputBufferSize; + } + + + public void setInputBufferSize(int inputBufferSize) + { + this.inputBufferSize = inputBufferSize; + } + + + public int getOutputBufferSize() + { + return outputBufferSize; + } + + + public void setOutputBufferSize(int outputBufferSize) + { + this.outputBufferSize = outputBufferSize; + } + + /** + * Return {@link TFile} {@link Reader} + * + */ + public static class DefaultTFileImpl extends TFileImpl{ + + @Override + public HDSFileReader getReader(long bucketKey, String fileName) throws IOException + { + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + long fileLength = getFileSize(bucketKey, fileName); + super.setupConfig(fs.getConf()); + return new TFileReader(fsdis, fileLength, fs.getConf()); + } + + } + + + /** + * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader} + * + */ + public static class DTFileImpl extends TFileImpl { + + @Override + public HDSFileReader getReader(long bucketKey, String fileName) throws IOException + { + FSDataInputStream fsdis = getInputStream(bucketKey, fileName); + long fileLength = getFileSize(bucketKey, fileName); + super.setupConfig(fs.getConf()); + return new DTFileReader(fsdis, fileLength, fs.getConf()); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileReader.java ---------------------------------------------------------------------- diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java new file mode 100644 index 0000000..d20408c --- /dev/null +++ b/tfile/TFileReader.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht.tfile; + +import java.io.IOException; +import java.util.TreeMap; + +import com.datatorrent.common.util.Slice; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.file.tfile.TFile.Reader; +import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner; +import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry; + +import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader; + +public class TFileReader implements HDSFileReader +{ + + private final Reader reader; + private final Scanner scanner; + private final FSDataInputStream fsdis; + + public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException + { + this.fsdis = fsdis; + reader = new Reader(fsdis, fileLength, conf); + scanner = reader.createScanner(); + } + + /** + * Unlike the TFile.Reader.close method this will close the wrapped InputStream. + * @see java.io.Closeable#close() + */ + @Override + public void close() throws IOException + { + scanner.close(); + reader.close(); + fsdis.close(); + } + + @Override + public void readFully(TreeMap<Slice, byte[]> data) throws IOException + { + scanner.rewind(); + for (; !scanner.atEnd(); scanner.advance()) { + Entry en = scanner.entry(); + int klen = en.getKeyLength(); + int vlen = en.getValueLength(); + byte[] key = new byte[klen]; + byte[] value = new byte[vlen]; + en.getKey(key); + en.getValue(value); + data.put(new Slice(key, 0, key.length), value); + } + + } + + @Override + public void reset() throws IOException + { + scanner.rewind(); + } + + @Override + public boolean seek(Slice key) throws IOException + { + return scanner.seekTo(key.buffer, key.offset, key.length); + } + + @Override + public boolean next(Slice key, Slice value) throws IOException + { + if (scanner.atEnd()) return false; + Entry en = scanner.entry(); + byte[] rkey = new byte[en.getKeyLength()]; + byte[] rval = new byte[en.getValueLength()]; + en.getKey(rkey); + en.getValue(rval); + + key.buffer = rkey; + key.offset = 0; + key.length = en.getKeyLength(); + + value.buffer = rval; + value.offset = 0; + value.length = en.getValueLength(); + + scanner.advance(); + return true; + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileWriter.java ---------------------------------------------------------------------- diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java new file mode 100644 index 0000000..b6fd90d --- /dev/null +++ b/tfile/TFileWriter.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.contrib.hdht.tfile; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.file.tfile.TFile.Writer; + +import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter; + +public final class TFileWriter implements HDSFileWriter +{ + private Writer writer; + + private FSDataOutputStream fsdos; + + public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException + { + this.fsdos = stream; + writer = new Writer(stream, minBlockSize, compressName, comparator, conf); + + } + + @Override + public void close() throws IOException + { + writer.close(); + fsdos.close(); + } + + @Override + public void append(byte[] key, byte[] value) throws IOException + { + writer.append(key, value); + } + + @Override + public long getBytesWritten() throws IOException{ return fsdos.getPos(); } + +}
