STORM-166: Leveraging NimbusInfo.parse
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a92a1e9c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a92a1e9c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a92a1e9c Branch: refs/heads/nimbus-ha-branch Commit: a92a1e9c7887383286295d90d53899dbdbdde000 Parents: 2954eae Author: Parth Brahmbhatt <[email protected]> Authored: Fri Dec 19 13:56:25 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Fri Dec 19 13:56:25 2014 -0800 ---------------------------------------------------------------------- .../LocalFileSystemCodeDistributor.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a92a1e9c/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java index 96422e2..02d5e2d 100644 --- a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java +++ b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java @@ -1,5 +1,6 @@ package backtype.storm.codedistributor; +import backtype.storm.nimbus.NimbusInfo; import backtype.storm.utils.ZookeeperAuthInfo; import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; @@ -62,26 +63,26 @@ public class LocalFileSystemCodeDistributor implements ICodeDistributor { List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid); File destDir = metafile.getParentFile(); List<File> downloadedFiles = Lists.newArrayList(); - for (String absoluteFilePath : FileUtils.readLines(metafile)) { + for (String absolutePathOnRemote : FileUtils.readLines(metafile)) { - File localFile = new File(destDir, new File(absoluteFilePath).getName()); + File localFile = new File(destDir, new File(absolutePathOnRemote).getName()); boolean isSuccess = false; for (String hostAndPort : hostInfos) { - String host = hostAndPort.split(":")[0]; - int port = Integer.parseInt(hostAndPort.split(":")[1]); + NimbusInfo nimbusInfo = NimbusInfo.parse(hostAndPort); try { - downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port); + LOG.info("Attempting to download meta file {} from remote {}", absolutePathOnRemote, nimbusInfo.toHostPortString()); + downloadFromHost(conf, absolutePathOnRemote, localFile.getAbsolutePath(), nimbusInfo.getHost(), nimbusInfo.getPort()); downloadedFiles.add(localFile); isSuccess = true; break; } catch (Exception e) { - LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e); + LOG.error("download failed from {}:{}, will try another endpoint ", nimbusInfo.getHost(), nimbusInfo.getPort(), e); } } if(!isSuccess) { - throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint"); + throw new RuntimeException("File " + absolutePathOnRemote +" could not be downloaded from any endpoint"); } }
