Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 431f48f65 -> 7b783911c


MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when 
strategy is dynamic. Contributed by Kuhu Shukla.
(cherry picked from commit 2868ca0328d908056745223fb38d9a90fd2811ba)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b783911
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b783911
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b783911

Branch: refs/heads/branch-2.7
Commit: 7b783911c4062dcc957a1d577ffec56471510d23
Parents: 431f48f
Author: Kihwal Lee <kih...@apache.org>
Authored: Fri Oct 30 14:58:54 2015 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Fri Oct 30 14:58:54 2015 -0500

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../tools/mapred/lib/DynamicInputChunk.java     | 137 +++----------------
 .../tools/mapred/lib/DynamicInputFormat.java    |  31 +++--
 .../tools/mapred/lib/DynamicRecordReader.java   |  13 +-
 .../org/apache/hadoop/tools/StubContext.java    |   4 +
 .../mapred/lib/TestDynamicInputFormat.java      |  33 ++++-
 6 files changed, 83 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 9aac41a..7f61e4d 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -55,6 +55,9 @@ Release 2.7.2 - UNRELEASED
     MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary()
     (Junping Du via jlowe)
 
+    MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when
+    strategy is dynamic (Kuhu Shukla via kihwal)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
index 8482e7d..9bf8e47 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java
@@ -20,14 +20,10 @@ package org.apache.hadoop.tools.mapred.lib;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.tools.CopyListingFileStatus;
-import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -47,72 +43,28 @@ import java.io.IOException;
  */
 class DynamicInputChunk<K, V> {
   private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
-
-  private static Configuration configuration;
-  private static Path chunkRootPath;
-  private static String chunkFilePrefix;
-  private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
-  private static FileSystem fs;
-
   private Path chunkFilePath;
   private SequenceFileRecordReader<K, V> reader;
   private SequenceFile.Writer writer;
+  private DynamicInputChunkContext chunkContext;
 
-  private static void initializeChunkInvariants(Configuration config)
-                                                  throws IOException {
-    configuration = config;
-    Path listingFilePath = new Path(getListingFilePath(configuration));
-    chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
-    fs = chunkRootPath.getFileSystem(configuration);
-    chunkFilePrefix = listingFilePath.getName() + ".chunk.";
-  }
-
-  private static String getListingFilePath(Configuration configuration) {
-    final String listingFileString = configuration.get(
-            DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
-    assert !listingFileString.equals("") : "Listing file not found.";
-    return listingFileString;
-  }
-
-  private static boolean areInvariantsInitialized() {
-    return chunkRootPath != null;
-  }
-
-  private DynamicInputChunk(String chunkId, Configuration configuration)
+  DynamicInputChunk(String chunkId, DynamicInputChunkContext chunkContext)
                                                       throws IOException {
-    if (!areInvariantsInitialized())
-      initializeChunkInvariants(configuration);
-
-    chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
+    this.chunkContext = chunkContext;
+    chunkFilePath = new Path(chunkContext.getChunkRootPath(),
+        chunkContext.getChunkFilePrefix() + chunkId);
     openForWrite();
   }
 
-
   private void openForWrite() throws IOException {
     writer = SequenceFile.createWriter(
-            chunkFilePath.getFileSystem(configuration), configuration,
+            chunkContext.getFs(), chunkContext.getConfiguration(),
             chunkFilePath, Text.class, CopyListingFileStatus.class,
             SequenceFile.CompressionType.NONE);
 
   }
 
   /**
-   * Factory method to create chunk-files for writing to.
-   * (For instance, when the DynamicInputFormat splits the input-file into
-   * chunks.)
-   * @param chunkId String to identify the chunk.
-   * @param configuration Configuration, describing the location of the 
listing-
-   * file, file-system for the map-job, etc.
-   * @return A DynamicInputChunk, corresponding to a chunk-file, with the name
-   * incorporating the chunk-id.
-   * @throws IOException Exception on failure to create the chunk.
-   */
-  public static DynamicInputChunk createChunkForWrite(String chunkId,
-                          Configuration configuration) throws IOException {
-    return new DynamicInputChunk(chunkId, configuration);
-  }
-
-  /**
    * Method to write records into a chunk.
    * @param key Key from the listing file.
    * @param value Corresponding value from the listing file.
@@ -135,19 +87,19 @@ class DynamicInputChunk<K, V> {
    * @throws IOException Exception on failure to reassign.
    */
   public void assignTo(TaskID taskId) throws IOException {
-    Path newPath = new Path(chunkRootPath, taskId.toString());
-    if (!fs.rename(chunkFilePath, newPath)) {
+    Path newPath = new Path(chunkContext.getChunkRootPath(), 
taskId.toString());
+    if (!chunkContext.getFs().rename(chunkFilePath, newPath)) {
       LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
     }
   }
 
-  private DynamicInputChunk(Path chunkFilePath,
-                            TaskAttemptContext taskAttemptContext)
-                                   throws IOException, InterruptedException {
-    if (!areInvariantsInitialized())
-      initializeChunkInvariants(taskAttemptContext.getConfiguration());
+  public DynamicInputChunk(Path chunkFilePath,
+      TaskAttemptContext taskAttemptContext,
+      DynamicInputChunkContext chunkContext) throws IOException,
+      InterruptedException {
 
     this.chunkFilePath = chunkFilePath;
+    this.chunkContext = chunkContext;
     openForRead(taskAttemptContext);
   }
 
@@ -155,45 +107,8 @@ class DynamicInputChunk<K, V> {
           throws IOException, InterruptedException {
     reader = new SequenceFileRecordReader<K, V>();
     reader.initialize(new FileSplit(chunkFilePath, 0,
-            DistCpUtils.getFileSize(chunkFilePath, configuration), null),
-            taskAttemptContext);
-  }
-
-  /**
-   * Factory method that
-   * 1. acquires a chunk for the specified map-task attempt
-   * 2. returns a DynamicInputChunk associated with the acquired chunk-file.
-   * @param taskAttemptContext The attempt-context for the map task that's
-   * trying to acquire a chunk.
-   * @return The acquired dynamic-chunk. The chunk-file is renamed to the
-   * attempt-id (from the attempt-context.)
-   * @throws IOException Exception on failure.
-   * @throws InterruptedException Exception on failure.
-   */
-  public static DynamicInputChunk acquire(TaskAttemptContext 
taskAttemptContext)
-                                      throws IOException, InterruptedException 
{
-    if (!areInvariantsInitialized())
-        initializeChunkInvariants(taskAttemptContext.getConfiguration());
-
-    String taskId
-            = taskAttemptContext.getTaskAttemptID().getTaskID().toString();
-    Path acquiredFilePath = new Path(chunkRootPath, taskId);
-
-    if (fs.exists(acquiredFilePath)) {
-      LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
-      return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
-    }
-
-    for (FileStatus chunkFile : getListOfChunkFiles()) {
-      if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
-        LOG.info(taskId + " acquired " + chunkFile.getPath());
-        return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
-      }
-      else
-        LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
-    }
-
-    return null;
+            DistCpUtils.getFileSize(chunkFilePath,
+                chunkContext.getConfiguration()), null), taskAttemptContext);
   }
 
   /**
@@ -204,19 +119,13 @@ class DynamicInputChunk<K, V> {
    */
   public void release() throws IOException {
     close();
-    if (!fs.delete(chunkFilePath, false)) {
+    if (!chunkContext.getFs().delete(chunkFilePath, false)) {
       LOG.error("Unable to release chunk at path: " + chunkFilePath);
-      throw new IOException("Unable to release chunk at path: " + 
chunkFilePath);
+      throw new IOException("Unable to release chunk at path: " +
+          chunkFilePath);
     }
   }
 
-  static FileStatus [] getListOfChunkFiles() throws IOException {
-    Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
-    FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
-    numChunksLeft = chunkFiles.length;
-    return chunkFiles;
-  }
-
   /**
    * Getter for the chunk-file's path, on HDFS.
    * @return The qualified path to the chunk-file.
@@ -234,14 +143,4 @@ class DynamicInputChunk<K, V> {
     return reader;
   }
 
-  /**
-   * Getter for the number of chunk-files left in the chunk-file directory.
-   * Useful to determine how many chunks (and hence, records) are left to be
-   * processed.
-   * @return Before the first scan of the directory, the number returned is -1.
-   * Otherwise, the number of chunk-files seen from the last scan is returned.
-   */
-  public static int getNumChunksLeft() {
-    return numChunksLeft;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
index 38269c7..fe8604a 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java
@@ -57,7 +57,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, 
V> {
           = "mapred.num.splits";
   private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
           = "mapred.num.entries.per.chunk";
-  
+  private DynamicInputChunkContext<K, V> chunkContext = null;
+
   /**
    * Implementation of InputFormat::getSplits(). This method splits up the
    * copy-listing file into chunks, and assigns the first batch to different
@@ -72,6 +73,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, 
V> {
       throws IOException, InterruptedException {
     LOG.info("DynamicInputFormat: Getting splits for job:"
              + jobContext.getJobID());
+    chunkContext = getChunkContext(jobContext.getConfiguration());
     return createSplits(jobContext,
                         splitCopyListingIntoChunksWithShuffle(jobContext));
   }
@@ -101,6 +103,13 @@ public class DynamicInputFormat<K, V> extends 
InputFormat<K, V> {
 
   private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;
 
+  public  DynamicInputChunkContext<K, V> getChunkContext(
+      Configuration configuration) throws IOException {
+    if(chunkContext == null) {
+      chunkContext = new DynamicInputChunkContext<K, V>(configuration);
+    }
+    return chunkContext;
+  }
   private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle
                                     (JobContext context) throws IOException {
 
@@ -146,8 +155,8 @@ public class DynamicInputFormat<K, V> extends 
InputFormat<K, V> {
           closeAll(openChunks);
           chunksFinal.addAll(openChunks);
 
-          openChunks = createChunks(
-                  configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
+          openChunks = createChunks(chunkCount, nChunksTotal,
+              nChunksOpenAtOnce);
 
           chunkCount += openChunks.size();
 
@@ -183,9 +192,9 @@ public class DynamicInputFormat<K, V> extends 
InputFormat<K, V> {
       chunk.close();
   }
 
-  private static List<DynamicInputChunk> createChunks(Configuration config,
-                      int chunkCount, int nChunksTotal, int nChunksOpenAtOnce)
-                                          throws IOException {
+  private List<DynamicInputChunk> createChunks(int chunkCount,
+      int nChunksTotal, int nChunksOpenAtOnce)
+      throws IOException {
     List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
     int chunkIdUpperBound
             = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
@@ -197,14 +206,13 @@ public class DynamicInputFormat<K, V> extends 
InputFormat<K, V> {
       chunkIdUpperBound = nChunksTotal;
 
     for (int i=chunkCount; i < chunkIdUpperBound; ++i)
-      chunks.add(createChunk(i, config));
+      chunks.add(createChunk(i));
     return chunks;
   }
 
-  private static DynamicInputChunk createChunk(int chunkId, Configuration 
config)
+  private DynamicInputChunk createChunk(int chunkId)
                                               throws IOException {
-    return DynamicInputChunk.createChunkForWrite(String.format("%05d", 
chunkId),
-                                              config);
+    return chunkContext.createChunkForWrite(String.format("%05d", chunkId));
   }
 
 
@@ -351,6 +359,7 @@ public class DynamicInputFormat<K, V> extends 
InputFormat<K, V> {
           InputSplit inputSplit,
           TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
-    return new DynamicRecordReader<K, V>();
+    chunkContext = getChunkContext(taskAttemptContext.getConfiguration());
+    return new DynamicRecordReader<K, V>(chunkContext);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
index 00b3c69..87b8f08 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java
@@ -49,9 +49,14 @@ public class DynamicRecordReader<K, V> extends 
RecordReader<K, V> {
   private int numRecordsProcessedByThisMap = 0;
   private long timeOfLastChunkDirScan = 0;
   private boolean isChunkDirAlreadyScanned = false;
+  private DynamicInputChunkContext<K, V> chunkContext;
 
   private static long TIME_THRESHOLD_FOR_DIR_SCANS = 
TimeUnit.MINUTES.toMillis(5);
 
+  DynamicRecordReader(DynamicInputChunkContext<K, V> chunkContext) {
+    this.chunkContext = chunkContext;
+  }
+
   /**
    * Implementation for RecordReader::initialize(). Initializes the internal
    * RecordReader to read from chunks.
@@ -69,7 +74,7 @@ public class DynamicRecordReader<K, V> extends 
RecordReader<K, V> {
     this.taskAttemptContext = taskAttemptContext;
     configuration = taskAttemptContext.getConfiguration();
     taskId = taskAttemptContext.getTaskAttemptID().getTaskID();
-    chunk = DynamicInputChunk.acquire(this.taskAttemptContext);
+    chunk = chunkContext.acquire(this.taskAttemptContext);
     timeOfLastChunkDirScan = System.currentTimeMillis();
     isChunkDirAlreadyScanned = false;
 
@@ -114,7 +119,7 @@ public class DynamicRecordReader<K, V> extends 
RecordReader<K, V> {
     timeOfLastChunkDirScan = System.currentTimeMillis();
     isChunkDirAlreadyScanned = false;
     
-    chunk = DynamicInputChunk.acquire(taskAttemptContext);
+    chunk = chunkContext.acquire(taskAttemptContext);
 
     if (chunk == null) return false;
 
@@ -182,12 +187,12 @@ public class DynamicRecordReader<K, V> extends 
RecordReader<K, V> {
             || (!isChunkDirAlreadyScanned &&
                     numRecordsProcessedByThisMap%numRecordsPerChunk
                               > numRecordsPerChunk/2)) {
-      DynamicInputChunk.getListOfChunkFiles();
+      chunkContext.getListOfChunkFiles();
       isChunkDirAlreadyScanned = true;
       timeOfLastChunkDirScan = now;
     }
 
-    return DynamicInputChunk.getNumChunksLeft();
+    return chunkContext.getNumChunksLeft();
   }
   /**
    * Implementation of RecordReader::close().

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
index 1a2227c..4bb6c98 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java
@@ -64,6 +64,10 @@ public class StubContext {
     return reader;
   }
 
+  public void setReader(RecordReader<Text, CopyListingFileStatus> reader) {
+    this.reader = reader;
+  }
+
   public StubInMemoryWriter getWriter() {
     return writer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
index 8cc8317..bb2dd9d 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java
@@ -40,6 +40,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -126,13 +127,14 @@ public class TestDynamicInputFormat {
     int taskId = 0;
 
     for (InputSplit split : splits) {
-      RecordReader<Text, CopyListingFileStatus> recordReader =
-           inputFormat.createRecordReader(split, null);
       StubContext stubContext = new StubContext(jobContext.getConfiguration(),
-                                                recordReader, taskId);
+                                                null, taskId);
       final TaskAttemptContext taskAttemptContext
          = stubContext.getContext();
-      
+
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+          inputFormat.createRecordReader(split, taskAttemptContext);
+      stubContext.setReader(recordReader);
       recordReader.initialize(splits.get(0), taskAttemptContext);
       float previousProgressValue = 0f;
       while (recordReader.nextKeyValue()) {
@@ -182,4 +184,27 @@ public class TestDynamicInputFormat {
     conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53);
     Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf));
   }
+
+  @Test
+  public void testDynamicInputChunkContext() throws IOException {
+    Configuration configuration = new Configuration();
+    configuration.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH,
+        "/tmp/test/file1.seq");
+    DynamicInputFormat firstInputFormat = new DynamicInputFormat();
+    DynamicInputFormat secondInputFormat = new DynamicInputFormat();
+    DynamicInputChunkContext firstContext =
+        firstInputFormat.getChunkContext(configuration);
+    DynamicInputChunkContext secondContext =
+        firstInputFormat.getChunkContext(configuration);
+    DynamicInputChunkContext thirdContext =
+        secondInputFormat.getChunkContext(configuration);
+    DynamicInputChunkContext fourthContext =
+        secondInputFormat.getChunkContext(configuration);
+    Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
+        "object should be the same.",firstContext.equals(secondContext));
+    Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " +
+        "object should be the same.",thirdContext.equals(fourthContext));
+    Assert.assertTrue("Contexts from different DynamicInputFormat " +
+        "objects should be different.",!firstContext.equals(thirdContext));
+  }
 }

Reply via email to