Repository: spark Updated Branches: refs/heads/master 2b9b72682 -> 9bd9334f5
Config updates for the new shuffle transport. Author: Reynold Xin <[email protected]> Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bd9334f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bd9334f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bd9334f Branch: refs/heads/master Commit: 9bd9334f588dbb44d01554f9f4ca68a153a48993 Parents: 2b9b726 Author: Reynold Xin <[email protected]> Authored: Tue Dec 9 19:29:09 2014 -0800 Committer: Aaron Davidson <[email protected]> Committed: Tue Dec 9 19:29:09 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/spark/network/util/TransportConf.java | 8 ++++---- .../org/apache/spark/network/sasl/SaslClientBootstrap.java | 2 +- .../apache/spark/network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f605739..13b37f9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public class TransportConf { return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt("spark.shuffle.io.connectionTimeout", 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { - return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 2); + return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public class TransportConf { public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt("spark.shuffle.sasl.timeout", 30000); } + public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public class TransportConf { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries > 0. */ - public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e3..33aa134 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public class SaslClientBootstrap implements TransportClientBootstrap { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); - byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); + byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index f8a1a26..4bb0498 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -106,7 +106,7 @@ public class RetryingBlockFetcher { this.fetchStarter = fetchStarter; this.listener = listener; this.maxRetries = conf.maxIORetries(); - this.retryWaitTime = conf.ioRetryWaitTime(); + this.retryWaitTime = conf.ioRetryWaitTimeMs(); this.outstandingBlocksIds = Sets.newLinkedHashSet(); Collections.addAll(outstandingBlocksIds, blockIds); this.currentListener = new RetryingBlockFetchListener(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
