Addendum to MAPREDUCE-6451

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b24fe064
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b24fe064
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b24fe064

Branch: refs/heads/HDFS-7240
Commit: b24fe0648348d325d14931f80cee8a170fb3358a
Parents: 2868ca0
Author: Kihwal Lee <kih...@apache.org>
Authored: Fri Oct 30 16:05:23 2015 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Fri Oct 30 16:05:23 2015 -0500

----------------------------------------------------------------------
 .../mapred/lib/DynamicInputChunkContext.java    | 113 +++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b24fe064/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java
new file mode 100644
index 0000000..043ff1c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunkContext.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred.lib;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.tools.DistCpConstants;
+
+import java.io.IOException;
+
+/**
+ * Class to initialize the DynamicInputChunk invariants.
+ */
+class DynamicInputChunkContext<K, V> {
+
+  private static Log LOG = LogFactory.getLog(DynamicInputChunkContext.class);
+  private Configuration configuration;
+  private Path chunkRootPath = null;
+  private String chunkFilePrefix;
+  private FileSystem fs;
+  private int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
+
+  public DynamicInputChunkContext(Configuration config)
+      throws IOException {
+    this.configuration = config;
+    Path listingFilePath = new Path(getListingFilePath(configuration));
+    chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
+    fs = chunkRootPath.getFileSystem(configuration);
+    chunkFilePrefix = listingFilePath.getName() + ".chunk.";
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public Path getChunkRootPath() {
+    return chunkRootPath;
+  }
+
+  public String getChunkFilePrefix() {
+    return chunkFilePrefix;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  private static String getListingFilePath(Configuration configuration) {
+    final String listingFileString = configuration.get(
+        DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+    assert !listingFileString.equals("") : "Listing file not found.";
+    return listingFileString;
+  }
+
+  public int getNumChunksLeft() {
+    return numChunksLeft;
+  }
+
+  public DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+
+    String taskId
+        = taskAttemptContext.getTaskAttemptID().getTaskID().toString();
+    Path acquiredFilePath = new Path(getChunkRootPath(), taskId);
+
+    if (fs.exists(acquiredFilePath)) {
+      LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
+      return new DynamicInputChunk(acquiredFilePath, taskAttemptContext, this);
+    }
+
+    for (FileStatus chunkFile : getListOfChunkFiles()) {
+      if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
+        LOG.info(taskId + " acquired " + chunkFile.getPath());
+        return new DynamicInputChunk(acquiredFilePath, taskAttemptContext,
+            this);
+      }
+    }
+    return null;
+  }
+
+  public DynamicInputChunk createChunkForWrite(String chunkId)
+      throws IOException {
+    return new DynamicInputChunk(chunkId, this);
+  }
+
+  public FileStatus [] getListOfChunkFiles() throws IOException {
+    Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
+    FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
+    numChunksLeft = chunkFiles.length;
+    return chunkFiles;
+  }
+}

Reply via email to