HADOOP-1381. The distance between sync blocks in SequenceFiles should be 
configurable rather than hard coded to 2000 bytes. Contributed by Harsh J.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07825f2b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07825f2b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07825f2b

Branch: refs/heads/HADOOP-13345
Commit: 07825f2b49384dbec92bfae87ea661cef9ffab49
Parents: ee3d437
Author: Harsh J <ha...@cloudera.com>
Authored: Wed Oct 26 20:04:33 2016 +0530
Committer: Harsh J <ha...@cloudera.com>
Committed: Fri Nov 25 22:22:23 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/io/SequenceFile.java |  74 ++++++++----
 .../apache/hadoop/io/TestSequenceFileSync.java  | 113 +++++++++++++++----
 2 files changed, 146 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/07825f2b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 2ac1389..c510ff7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -24,6 +24,7 @@ import java.util.*;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.*;
 import org.apache.hadoop.util.Options;
 import org.apache.hadoop.fs.*;
@@ -146,7 +147,7 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSU
  *   </ul>
  * </li>
  * <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every few <code>100</code> kilobytes or so.
  * </li>
  * </ul>
  *
@@ -165,7 +166,7 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSU
  *   </ul>
  * </li>
  * <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every few <code>100</code> kilobytes or so.
  * </li>
  * </ul>
  * 
@@ -217,8 +218,11 @@ public class SequenceFile {
   private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
   private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
 
-  /** The number of bytes between sync points.*/
-  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
+  /**
+   * The number of bytes between sync points. 100 KB, default.
+   * Computed as 5 KB * 20 = 100 KB
+   */
+  public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)
 
   /** 
    * The compression type used to compress key/value pairs in the 
@@ -856,6 +860,9 @@ public class SequenceFile {
     // starts and ends by scanning for this value.
     long lastSyncPos;                     // position of last sync
     byte[] sync;                          // 16 random bytes
+    @VisibleForTesting
+    int syncInterval;
+
     {
       try {                                       
         MessageDigest digester = MessageDigest.getInstance("MD5");
@@ -987,7 +994,16 @@ public class SequenceFile {
     private static Option filesystem(FileSystem fs) {
       return new SequenceFile.Writer.FileSystemOption(fs);
     }
-    
+
+    private static class SyncIntervalOption extends Options.IntegerOption
+        implements Option {
+      SyncIntervalOption(int val) {
+        // If a negative sync interval is provided,
+        // fall back to the default sync interval.
+        super(val < 0 ? SYNC_INTERVAL : val);
+      }
+    }
+
     public static Option bufferSize(int value) {
       return new BufferSizeOption(value);
     }
@@ -1032,11 +1048,15 @@ public class SequenceFile {
         CompressionCodec codec) {
       return new CompressionOption(value, codec);
     }
-    
+
+    public static Option syncInterval(int value) {
+      return new SyncIntervalOption(value);
+    }
+
     /**
      * Construct a uncompressed writer from a set of options.
      * @param conf the configuration to use
-     * @param options the options used when creating the writer
+     * @param opts the options used when creating the writer
      * @throws IOException if it fails
      */
     Writer(Configuration conf, 
@@ -1062,6 +1082,8 @@ public class SequenceFile {
         Options.getOption(MetadataOption.class, opts);
       CompressionOption compressionTypeOption =
         Options.getOption(CompressionOption.class, opts);
+      SyncIntervalOption syncIntervalOption =
+          Options.getOption(SyncIntervalOption.class, opts);
       // check consistency of options
       if ((fileOption == null) == (streamOption == null)) {
         throw new IllegalArgumentException("file or stream must be specified");
@@ -1163,7 +1185,12 @@ public class SequenceFile {
                                            "GzipCodec without native-hadoop " +
                                            "code!");
       }
-      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
+      this.syncInterval = (syncIntervalOption == null) ?
+          SYNC_INTERVAL :
+          syncIntervalOption.getValue();
+      init(
+          conf, out, ownStream, keyClass, valueClass,
+          codec, metadata, syncInterval);
     }
 
     /** Create the named file.
@@ -1176,7 +1203,7 @@ public class SequenceFile {
                   Class keyClass, Class valClass) throws IOException {
       this.compress = CompressionType.NONE;
       init(conf, fs.create(name), true, keyClass, valClass, null, 
-           new Metadata());
+           new Metadata(), SYNC_INTERVAL);
     }
     
     /** Create the named file with write-progress reporter.
@@ -1190,7 +1217,7 @@ public class SequenceFile {
                   Progressable progress, Metadata metadata) throws IOException 
{
       this.compress = CompressionType.NONE;
       init(conf, fs.create(name, progress), true, keyClass, valClass,
-           null, metadata);
+           null, metadata, SYNC_INTERVAL);
     }
     
     /** Create the named file with write-progress reporter. 
@@ -1206,7 +1233,7 @@ public class SequenceFile {
       this.compress = CompressionType.NONE;
       init(conf,
            fs.create(name, true, bufferSize, replication, blockSize, progress),
-           true, keyClass, valClass, null, metadata);
+           true, keyClass, valClass, null, metadata, SYNC_INTERVAL);
     }
 
     boolean isCompressed() { return compress != CompressionType.NONE; }
@@ -1234,18 +1261,21 @@ public class SequenceFile {
 
     /** Initialize. */
     @SuppressWarnings("unchecked")
-    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
-              Class keyClass, Class valClass,
-              CompressionCodec codec, Metadata metadata) 
+    void init(Configuration config, FSDataOutputStream outStream,
+              boolean ownStream, Class key, Class val,
+              CompressionCodec compCodec, Metadata meta,
+              int syncIntervalVal)
       throws IOException {
-      this.conf = conf;
-      this.out = out;
+      this.conf = config;
+      this.out = outStream;
       this.ownOutputStream = ownStream;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
-      this.codec = codec;
-      this.metadata = metadata;
-      SerializationFactory serializationFactory = new 
SerializationFactory(conf);
+      this.keyClass = key;
+      this.valClass = val;
+      this.codec = compCodec;
+      this.metadata = meta;
+      this.syncInterval = syncIntervalVal;
+      SerializationFactory serializationFactory =
+          new SerializationFactory(config);
       this.keySerializer = serializationFactory.getSerializer(keyClass);
       if (this.keySerializer == null) {
         throw new IOException(
@@ -1366,7 +1396,7 @@ public class SequenceFile {
 
     synchronized void checkAndWriteSync() throws IOException {
       if (sync != null &&
-          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+          out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync
         sync();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07825f2b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java
index bceb8af..363177b 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java
@@ -27,13 +27,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
+/** Tests sync based seek reads/write intervals inside SequenceFiles. */
 public class TestSequenceFileSync {
   private static final int NUMRECORDS = 2000;
   private static final int RECORDSIZE = 80;
-  private static final Random rand = new Random();
+  private static final Random RAND = new Random();
 
   private final static String REC_FMT = "%d RECORDID %d : ";
 
@@ -46,37 +48,110 @@ public class TestSequenceFileSync {
     reader.next(key, val);
     assertEquals(key.get(), expectedRecord);
     final String test = String.format(REC_FMT, expectedRecord, expectedRecord);
-    assertEquals("Invalid value " + val, 0, val.find(test, 0));
+    assertEquals(
+        "Invalid value in iter " + iter + ": " + val,
+        0,
+        val.find(test, 0));
+  }
+
+  @Test
+  public void testDefaultSyncInterval() throws IOException {
+    // Uses the default sync interval of 100 KB
+    final Configuration conf = new Configuration();
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final Path path = new Path(GenericTestUtils.getTempPath(
+            "sequencefile.sync.test"));
+    final IntWritable input = new IntWritable();
+    final Text val = new Text();
+    SequenceFile.Writer writer = new SequenceFile.Writer(
+        conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.compression(CompressionType.NONE),
+        SequenceFile.Writer.keyClass(IntWritable.class),
+        SequenceFile.Writer.valueClass(Text.class)
+    );
+    try {
+      writeSequenceFile(writer, NUMRECORDS*4);
+      for (int i = 0; i < 5; i++) {
+        final SequenceFile.Reader reader;
+
+        //try different SequenceFile.Reader constructors
+        if (i % 2 == 0) {
+          final int buffersize = conf.getInt("io.file.buffer.size", 4096);
+          reader = new SequenceFile.Reader(conf,
+              SequenceFile.Reader.file(path),
+              SequenceFile.Reader.bufferSize(buffersize));
+        } else {
+          final FSDataInputStream in = fs.open(path);
+          final long length = fs.getFileStatus(path).getLen();
+          reader = new SequenceFile.Reader(conf,
+              SequenceFile.Reader.stream(in),
+              SequenceFile.Reader.start(0L),
+              SequenceFile.Reader.length(length));
+        }
+
+        try {
+          forOffset(reader, input, val, i, 0, 0);
+          forOffset(reader, input, val, i, 65, 0);
+          // There would be over 1000 records within
+          // this sync interval
+          forOffset(reader, input, val, i, 2000, 1101);
+          forOffset(reader, input, val, i, 0, 0);
+        } finally {
+          reader.close();
+        }
+      }
+    } finally {
+      fs.delete(path, false);
+    }
   }
 
   @Test
   public void testLowSyncpoint() throws IOException {
+    // Uses a smaller sync interval of 2000 bytes
     final Configuration conf = new Configuration();
     final FileSystem fs = FileSystem.getLocal(conf);
     final Path path = new Path(GenericTestUtils.getTempPath(
         "sequencefile.sync.test"));
     final IntWritable input = new IntWritable();
     final Text val = new Text();
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
-        IntWritable.class, Text.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(
+        conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.compression(CompressionType.NONE),
+        SequenceFile.Writer.keyClass(IntWritable.class),
+        SequenceFile.Writer.valueClass(Text.class),
+        SequenceFile.Writer.syncInterval(20*100)
+    );
+    // Ensure the custom sync interval value is set
+    assertEquals(writer.syncInterval, 20*100);
     try {
       writeSequenceFile(writer, NUMRECORDS);
-      for (int i = 0; i < 5 ; i++) {
-       final SequenceFile.Reader reader;
+      for (int i = 0; i < 5; i++) {
+        final SequenceFile.Reader reader;
        
-       //try different SequenceFile.Reader constructors
-       if (i % 2 == 0) {
-         reader = new SequenceFile.Reader(fs, path, conf);
-       } else {
-         final FSDataInputStream in = fs.open(path);
-         final long length = fs.getFileStatus(path).getLen();
-         final int buffersize = conf.getInt("io.file.buffer.size", 4096);
-         reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
-       }
+        //try different SequenceFile.Reader constructors
+        if (i % 2 == 0) {
+          final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+          reader = new SequenceFile.Reader(
+              conf,
+              SequenceFile.Reader.file(path),
+              SequenceFile.Reader.bufferSize(bufferSize));
+        } else {
+          final FSDataInputStream in = fs.open(path);
+          final long length = fs.getFileStatus(path).getLen();
+          reader = new SequenceFile.Reader(
+              conf,
+              SequenceFile.Reader.stream(in),
+              SequenceFile.Reader.start(0L),
+              SequenceFile.Reader.length(length));
+        }
 
-       try {
+        try {
           forOffset(reader, input, val, i, 0, 0);
           forOffset(reader, input, val, i, 65, 0);
+          // There would be only a few records within
+          // this sync interval
           forOffset(reader, input, val, i, 2000, 21);
           forOffset(reader, input, val, i, 0, 0);
         } finally {
@@ -88,7 +163,7 @@ public class TestSequenceFileSync {
     }
   }
 
-  public static void writeSequenceFile(SequenceFile.Writer writer,
+  private static void writeSequenceFile(SequenceFile.Writer writer,
       int numRecords) throws IOException {
     final IntWritable key = new IntWritable();
     final Text val = new Text();
@@ -100,13 +175,13 @@ public class TestSequenceFileSync {
     writer.close();
   }
 
-  static void randomText(Text val, int id, int recordSize) {
+  private static void randomText(Text val, int id, int recordSize) {
     val.clear();
     final StringBuilder ret = new StringBuilder(recordSize);
     ret.append(String.format(REC_FMT, id, id));
     recordSize -= ret.length();
     for (int i = 0; i < recordSize; ++i) {
-      ret.append(rand.nextInt(9));
+      ret.append(RAND.nextInt(9));
     }
     val.set(ret.toString());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to