Repository: hadoop
Updated Branches:
  refs/heads/branch-2 1c52b6551 -> 4aae0fe97


HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable.

Conflicts:
        hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4aae0fe9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4aae0fe9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4aae0fe9

Branch: refs/heads/branch-2
Commit: 4aae0fe976306aa3a734ee7dfb52ff7125c529fa
Parents: 1c52b65
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Fri Oct 17 18:27:42 2014 -0700
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Fri Oct 17 18:35:14 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt       |  2 ++
 .../java/org/apache/hadoop/hdfs/DFSClient.java    | 18 +++++++++++++-----
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java     |  2 ++
 .../org/apache/hadoop/hdfs/DFSOutputStream.java   |  3 +--
 4 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aae0fe9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d869015..4b7358b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -619,6 +619,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
     FSIMAGE_NAME_OPTIMIZATION.  (szetszwo)
 
+    HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. 
(szetszwo)
+
     BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
   
       HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aae0fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 8a7c8eb..5142b9d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
-import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
-    .EncryptedKeyVersion;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
@@ -59,6 +56,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@@ -89,10 +88,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.SocketFactory;
@@ -107,7 +106,9 @@ import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CacheFlag;
@@ -135,8 +136,8 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -217,6 +218,10 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
+import org.htrace.Sampler;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -294,6 +299,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     final int ioBufferSize;
     final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
+    final int writeMaxPackets;
     final int socketTimeout;
     final int socketCacheCapacity;
     final long socketCacheExpiry;
@@ -364,6 +370,8 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
       /** dfs.write.packet.size is an internal config variable */
       writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
           DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+      writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
+          DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
       defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
           DFS_BLOCK_SIZE_DEFAULT);
       defaultReplication = (short) conf.getInt(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aae0fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b5c7fed..e5e8e90 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -50,6 +50,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 
"10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
+  public static final String  DFS_CLIENT_WRITE_MAX_PACKETS_KEY = 
"dfs.client.write.max-packets";
+  public static final int     DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT = 80;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = 
"dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final String  
DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = 
"dfs.client.block.write.replace-datanode-on-failure.enable";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4aae0fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a37e4d6..b41e608 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -130,7 +130,6 @@ import com.google.common.cache.RemovalNotification;
 public class DFSOutputStream extends FSOutputSummer
     implements Syncable, CanSetDropBehind {
   private final long dfsclientSlowLogThresholdMs;
-  private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   /**
    * Number of times to retry creating a file when there are transient 
    * errors (typically related to encryption zones and KeyProvider operations).
@@ -1783,7 +1782,7 @@ public class DFSOutputStream extends FSOutputSummer
     synchronized (dataQueue) {
       try {
       // If queue is full, then wait till we have enough space
-      while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
+      while (!closed && dataQueue.size() + ackQueue.size()  > 
dfsClient.getConf().writeMaxPackets) {
         try {
           dataQueue.wait();
         } catch (InterruptedException e) {

Reply via email to