Author: cutting Date: Fri Jan 26 14:11:42 2007 New Revision: 500378 URL: http://svn.apache.org/viewvc?view=rev&rev=500378 Log: HADOOP-732. Add support to SequenceFile for arbitrary metadata, as a set of attribute value pairs. Contributed by Runping.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500378&r1=500377&r2=500378 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 14:11:42 2007 @@ -67,6 +67,9 @@ in HDFS, try another replica of the data, if any. (Wendy Chien via cutting) +21. HADOOP-732. Add support to SequenceFile for arbitrary metadata, + as a set of attribute value pairs. (Runping Qi via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 26 14:11:42 2007 @@ -50,8 +50,9 @@ private static final byte BLOCK_COMPRESS_VERSION = (byte)4; private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; + private static final byte VERSION_WITH_METADATA = (byte)6; private static byte[] VERSION = new byte[] { - (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION + (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA }; private static final int SYNC_ESCAPE = -1; // "length" of sync entries @@ -93,7 +94,7 @@ CompressionType val) { job.set("io.seqfile.compression.type", val.toString()); } - + /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. @@ -130,7 +131,7 @@ Writer writer = null; if (compressionType == CompressionType.NONE) { - writer = new Writer(fs, conf, name, keyClass, valClass); + writer = new Writer(fs, conf, name, keyClass, valClass, null, new Metadata()); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, new DefaultCodec()); @@ -161,13 +162,13 @@ Writer writer = null; if (compressionType == CompressionType.NONE) { - writer = new Writer(fs, conf, name, keyClass, valClass, progress); + writer = new Writer(fs, conf, name, keyClass, valClass, progress, new Metadata()); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, - keyClass, valClass, new DefaultCodec(), progress); + keyClass, valClass, new DefaultCodec(), progress, new Metadata()); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, - keyClass, valClass, new DefaultCodec(), progress); + keyClass, valClass, new DefaultCodec(), progress, new Metadata()); } return writer; @@ -222,6 +223,7 @@ * @param compressionType The compression type. * @param codec The compression codec. * @param progress The Progressable object to track progress. + * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ @@ -229,7 +231,7 @@ createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, CompressionCodec codec, - Progressable progress) throws IOException { + Progressable progress, Metadata metadata) throws IOException { if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded()) { @@ -240,17 +242,40 @@ Writer writer = null; if (compressionType == CompressionType.NONE) { - writer = new Writer(fs, conf, name, keyClass, valClass, progress); + writer = new Writer(fs, conf, name, keyClass, valClass, progress, metadata); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, - keyClass, valClass, codec, progress); + keyClass, valClass, codec, progress, metadata); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, - keyClass, valClass, codec, progress); + keyClass, valClass, codec, progress, metadata); } return writer; } + + /** + * Construct the preferred type of SequenceFile Writer. + * @param fs The configured filesystem. + * @param conf The configuration. + * @param name The name of the file. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @param progress The Progressable object to track progress. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + */ + public static Writer + createWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, + CompressionType compressionType, CompressionCodec codec, + Progressable progress) throws IOException { + Writer writer = createWriter(fs, conf, name, keyClass, valClass, + compressionType, codec, progress, new Metadata()); + return writer; + } /** * Construct the preferred type of 'raw' SequenceFile Writer. @@ -259,13 +284,14 @@ * @param valClass The 'value' type. * @param compress Compress data? * @param blockCompress Compress blocks? + * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ private static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, boolean blockCompress, - CompressionCodec codec) + CompressionCodec codec, Metadata metadata) throws IOException { if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && @@ -277,11 +303,11 @@ Writer writer = null; if (!compress) { - writer = new Writer(conf, out, keyClass, valClass); + writer = new Writer(conf, out, keyClass, valClass, metadata); } else if (compress && !blockCompress) { - writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec); + writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata); } else { - writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec); + writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata); } return writer; @@ -289,19 +315,41 @@ /** * Construct the preferred type of 'raw' SequenceFile Writer. + * @param out The stream on top which the writer is to be constructed. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compress Compress data? + * @param blockCompress Compress blocks? + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + */ + private static Writer + createWriter(Configuration conf, FSDataOutputStream out, + Class keyClass, Class valClass, boolean compress, boolean blockCompress, + CompressionCodec codec) + throws IOException { + Writer writer = createWriter(conf, out, keyClass, valClass, compress, + blockCompress, codec, new Metadata()); + return writer; + } + + + /** + * Construct the preferred type of 'raw' SequenceFile Writer. * @param conf The configuration. * @param out The stream on top which the writer is to be constructed. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compressionType The compression type. * @param codec The compression codec. + * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) + CompressionCodec codec, Metadata metadata) throws IOException { if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && @@ -313,15 +361,37 @@ Writer writer = null; if (compressionType == CompressionType.NONE) { - writer = new Writer(conf, out, keyClass, valClass); + writer = new Writer(conf, out, keyClass, valClass, metadata); } else if (compressionType == CompressionType.RECORD) { - writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec); + writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata); } else if (compressionType == CompressionType.BLOCK){ - writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec); + writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata); } return writer; } + + /** + * Construct the preferred type of 'raw' SequenceFile Writer. + * @param conf The configuration. + * @param out The stream on top which the writer is to be constructed. + * @param keyClass The 'key' type. + * @param valClass The 'value' type. + * @param compressionType The compression type. + * @param codec The compression codec. + * @return Returns the handle to the constructed SequenceFile Writer. + * @throws IOException + */ + public static Writer + createWriter(Configuration conf, FSDataOutputStream out, + Class keyClass, Class valClass, CompressionType compressionType, + CompressionCodec codec) + throws IOException { + Writer writer = createWriter(conf, out, keyClass, valClass, compressionType, + codec, new Metadata()); + return writer; + } + /** The interface to 'raw' values of SequenceFiles. */ public static interface ValueBytes { @@ -424,6 +494,99 @@ } // CompressedBytes + /** + * The class encapsulating with the metadata of a file. + * The metadata of a file is a list of attribute name/value + * pairs of Text type. + * + */ + static class Metadata implements Writable { + + private TreeMap<Text, Text> theMetadata; + + public Metadata() { + this(new TreeMap<Text, Text>()); + } + + public Metadata(TreeMap<Text, Text> arg) { + if (arg == null) { + this.theMetadata = new TreeMap<Text, Text>(); + } else { + this.theMetadata = arg; + } + } + + public Text get(Text name) { + return this.theMetadata.get(name); + } + + public void set(Text name, Text value) { + this.theMetadata.put(name, value); + } + + public TreeMap<Text, Text> getMetadata() { + return new TreeMap<Text, Text>(this.theMetadata); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(this.theMetadata.size()); + Iterator iter = this.theMetadata.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next(); + en.getKey().write(out); + en.getValue().write(out); + } + } + + public void readFields(DataInput in) throws IOException { + int sz = in.readInt(); + if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); + this.theMetadata = new TreeMap<Text, Text>(); + for (int i = 0; i < sz; i++) { + Text key = new Text(); + Text val = new Text(); + key.readFields(in); + val.readFields(in); + this.theMetadata.put(key, val); + } + } + + public boolean equals(Metadata other) { + if (other == null) return false; + if (this.theMetadata.size() != other.theMetadata.size()) { + return false; + } + Iterator iter1 = this.theMetadata.entrySet().iterator(); + Iterator iter2 = other.theMetadata.entrySet().iterator(); + while (iter1.hasNext() && iter2.hasNext()) { + Map.Entry<Text, Text> en1 = (Map.Entry<Text, Text>)iter1.next(); + Map.Entry<Text, Text> en2 = (Map.Entry<Text, Text>)iter2.next(); + if (!en1.getKey().equals(en2.getKey())) { + return false; + } + if (!en1.getValue().equals(en2.getValue())) { + return false; + } + } + if (iter1.hasNext() || iter2.hasNext()) { + return false; + } + return true; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("size: ").append(this.theMetadata.size()).append("\n"); + Iterator iter = this.theMetadata.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next(); + sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); + sb.append("\n"); + } + return sb.toString(); + } + } + /** Write key/value pairs to a sequence-format file. */ public static class Writer { Configuration conf; @@ -438,6 +601,7 @@ CompressionCodec codec = null; CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; + Metadata metadata = null; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record @@ -462,24 +626,24 @@ public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException { - this(fs, conf, name, keyClass, valClass, null); + this(fs, conf, name, keyClass, valClass, null, new Metadata()); } /** Create the named file with write-progress reporter. */ public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, Progressable progress) + Class keyClass, Class valClass, Progressable progress, Metadata metadata) throws IOException { - init(name, conf, fs.create(name, progress), keyClass, valClass, false, null); + init(name, conf, fs.create(name, progress), keyClass, valClass, false, null, metadata); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } - + /** Write to an arbitrary stream using a specified buffer size. */ private Writer(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass) + Class keyClass, Class valClass, Metadata metadata) throws IOException { - init(null, conf, out, keyClass, valClass, false, null); + init(null, conf, out, keyClass, valClass, false, null, metadata); initializeFileHeader(); writeFileHeader(); @@ -514,12 +678,13 @@ if(this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); } + this.metadata.write(out); } - + /** Initialize. */ void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, - boolean compress, CompressionCodec codec) + boolean compress, CompressionCodec codec, Metadata metadata) throws IOException { this.target = name; this.conf = conf; @@ -528,6 +693,7 @@ this.valClass = valClass; this.compress = compress; this.codec = codec; + this.metadata = metadata; if(this.codec != null) { ReflectionUtils.setConf(this.codec, this.conf); this.deflateFilter = this.codec.createOutputStream(buffer); @@ -644,7 +810,7 @@ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { - super.init(name, conf, fs.create(name), keyClass, valClass, true, codec); + super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata()); initializeFileHeader(); writeFileHeader(); @@ -654,28 +820,36 @@ /** Create the named file with write-progress reporter. */ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress) + Progressable progress, Metadata metadata) throws IOException { super.init(name, conf, fs.create(name, progress), - keyClass, valClass, true, codec); + keyClass, valClass, true, codec, metadata); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } + /** Create the named file with write-progress reporter. */ + public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, CompressionCodec codec, + Progressable progress) + throws IOException { + this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata()); + } + /** Write to an arbitrary stream using a specified buffer size. */ private RecordCompressWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionCodec codec) + Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { - super.init(null, conf, out, keyClass, valClass, true, codec); + super.init(null, conf, out, keyClass, valClass, true, codec, metadata); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } - + boolean isCompressed() { return true; } boolean isBlockCompressed() { return false; } @@ -752,7 +926,7 @@ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { - super.init(name, conf, fs.create(name), keyClass, valClass, true, codec); + super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata()); init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); @@ -763,10 +937,10 @@ /** Create the named file with write-progress reporter. */ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress) + Progressable progress, Metadata metadata) throws IOException { super.init(name, conf, fs.create(name, progress), keyClass, valClass, - true, codec); + true, codec, metadata); init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); @@ -774,18 +948,26 @@ finalizeFileHeader(); } + /** Create the named file with write-progress reporter. */ + public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass, CompressionCodec codec, + Progressable progress) + throws IOException { + this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata()); + } + /** Write to an arbitrary stream using a specified buffer size. */ private BlockCompressWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionCodec codec) + Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) throws IOException { - super.init(null, conf, out, keyClass, valClass, true, codec); + super.init(null, conf, out, keyClass, valClass, true, codec, metadata); init(1000000); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } - + boolean isCompressed() { return true; } boolean isBlockCompressed() { return true; } @@ -928,6 +1110,7 @@ private Class valClass; private CompressionCodec codec = null; + private Metadata metadata = null; private byte[] sync = new byte[SYNC_HASH_SIZE]; private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; @@ -1046,6 +1229,11 @@ } } + this.metadata = new Metadata(); + if (version >= VERSION_WITH_METADATA) { // if version >= 6 + this.metadata.readFields(in); + } + if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes } @@ -1095,6 +1283,11 @@ /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } + /** Returns the metadata object of the file */ + public Metadata getMetadata() { + return this.metadata; + } + /** Returns the configuration used for this file. */ Configuration getConf() { return conf; } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Fri Jan 26 14:11:42 2007 @@ -317,7 +317,91 @@ return sorter; } + /** Unit tests for SequenceFile metadata. */ + public void testSequenceFileMetadata() throws Exception { + LOG.info("Testing SequenceFile with metadata"); + int count = 1024 * 10; + int megabytes = 1; + int factor = 5; + CompressionCodec codec = new DefaultCodec(); + Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata"); + Path recordCompressedFile = + new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata"); + Path blockCompressedFile = + new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata"); + + FileSystem fs = FileSystem.getLocal(conf); + SequenceFile.Metadata theMetadata = new SequenceFile.Metadata(); + theMetadata.set(new Text("name_1"), new Text("value_1")); + theMetadata.set(new Text("name_2"), new Text("value_2")); + theMetadata.set(new Text("name_3"), new Text("value_3")); + theMetadata.set(new Text("name_4"), new Text("value_4")); + + int seed = new Random().nextInt(); + + try { + // SequenceFile.Writer + writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata); + SequenceFile.Metadata aMetadata = readMetadata(fs, file); + if (!theMetadata.equals(aMetadata)) { + LOG.info("The original metadata:\n" + theMetadata.toString()); + LOG.info("The retrieved metadata:\n" + aMetadata.toString()); + throw new RuntimeException("metadata not match: " + 1); + } + // SequenceFile.RecordCompressWriter + writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, + codec, theMetadata); + aMetadata = readMetadata(fs, recordCompressedFile); + if (!theMetadata.equals(aMetadata)) { + LOG.info("The original metadata:\n" + theMetadata.toString()); + LOG.info("The retrieved metadata:\n" + aMetadata.toString()); + throw new RuntimeException("metadata not match: " + 2); + } + // SequenceFile.BlockCompressWriter + writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, + codec, theMetadata); + aMetadata =readMetadata(fs, blockCompressedFile); + if (!theMetadata.equals(aMetadata)) { + LOG.info("The original metadata:\n" + theMetadata.toString()); + LOG.info("The retrieved metadata:\n" + aMetadata.toString()); + throw new RuntimeException("metadata not match: " + 3); + } + } finally { + fs.close(); + } + LOG.info("Successfully tested SequenceFile with metadata"); + } + + + private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file) + throws IOException { + LOG.info("reading file: " + file.toString() + "\n"); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf); + SequenceFile.Metadata meta = reader.getMetadata(); + reader.close(); + return meta; + } + private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file, + CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata) + throws IOException { + fs.delete(file); + LOG.info("creating " + count + " records with metadata and with" + compressionType + + " compression"); + SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, file, + RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata); + RandomDatum.Generator generator = new RandomDatum.Generator(seed); + for (int i = 0; i < count; i++) { + generator.next(); + RandomDatum key = generator.getKey(); + RandomDatum value = generator.getValue(); + + writer.append(key, value); + } + writer.close(); + } + /** For debugging and testing. */ public static void main(String[] args) throws Exception { int count = 1024 * 1024;