http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index 96ad939..a20cde0 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -91,9 +91,9 @@ public class ServerUtils { private static ServerUtils _instance = new ServerUtils(); /** - * Provide an instance of this class for delegates to use. To mock out - * delegated methods, provide an instance of a subclass that overrides the - * implementation of the delegated method. + * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * overrides the implementation of the delegated method. + * * @param u a ServerUtils instance * @return the previously set instance */ @@ -109,8 +109,8 @@ public class ServerUtils { List<List<T>> rest = new ArrayList<List<T>>(); for (List<T> node : nodeList) { if (node != null && node.size() > 0) { - first.add(node.get(0)); - rest.add(node.subList(1, node.size())); + first.add(node.get(0)); + rest.add(node.subList(1, node.size())); } } List<T> interleaveRest = interleaveAll(rest); @@ -120,14 +120,14 @@ public class ServerUtils { return first; } return null; - } + } public static BlobStore getNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo) { return getNimbusBlobStore(conf, null, nimbusInfo); } public static BlobStore getNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo) { - String type = (String)conf.get(DaemonConfig.NIMBUS_BLOBSTORE); + String type = (String) conf.get(DaemonConfig.NIMBUS_BLOBSTORE); if (type == null) { type = LocalFsBlobStore.class.getName(); } @@ -136,7 +136,7 @@ public class ServerUtils { // only enable cleanup of blobstore on nimbus nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE); - if(store != null) { + if (store != null) { // store can be null during testing when mocking utils. store.prepare(nconf, baseDir, nimbusInfo); } @@ -149,12 +149,13 @@ public class ServerUtils { /** * Returns the combined string, escaped for posix shell. + * * @param command the list of strings to be combined * @return the resulting command string */ - public static String shellCmd (List<String> command) { + public static String shellCmd(List<String> command) { List<String> changedCommands = new ArrayList<>(command.size()); - for (String str: command) { + for (String str : command) { if (str == null) { continue; } @@ -164,29 +165,29 @@ public class ServerUtils { } /** - * Takes an input dir or file and returns the disk usage on that local directory. - * Very basic implementation. + * Takes an input dir or file and returns the disk usage on that local directory. Very basic implementation. * * @param dir The input dir to get the disk space of this local dir * @return The total disk space of the input local directory */ public static long getDU(File dir) { long size = 0; - if (!dir.exists()) + if (!dir.exists()) { return 0; + } if (!dir.isDirectory()) { return dir.length(); } else { File[] allFiles = dir.listFiles(); - if(allFiles != null) { + if (allFiles != null) { for (int i = 0; i < allFiles.length; i++) { boolean isSymLink; try { isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]); - } catch(IOException ioe) { + } catch (IOException ioe) { isSymLink = true; } - if(!isSymLink) { + if (!isSymLink) { size += getDU(allFiles[i]); } } @@ -209,6 +210,7 @@ public class ServerUtils { /** * Meant to be called only by the supervisor for stormjar/stormconf/stormcode files. + * * @param key * @param localFile * @param cb @@ -225,7 +227,7 @@ public class ServerUtils { * Extract dir from the jar to destdir * * @param jarpath Path to the jar file - * @param dir Directory in the jar to pull out + * @param dir Directory in the jar to pull out * @param destdir Path to the directory where the extracted directory will be put */ public static void extractDirFromJar(String jarpath, String dir, File destdir) { @@ -233,8 +235,8 @@ public class ServerUtils { } /** - * Returns the value of java.class.path System property. Kept separate for - * testing. + * Returns the value of java.class.path System property. Kept separate for testing. + * * @return the classpath */ public static String currentClasspath() { @@ -245,16 +247,16 @@ public class ServerUtils { * Determines if a zip archive contains a particular directory. * * @param zipfile path to the zipped file - * @param target directory being looked for in the zip. + * @param target directory being looked for in the zip. * @return boolean whether or not the directory exists in the zip. */ public static boolean zipDoesContainDir(String zipfile, String target) throws IOException { List<ZipEntry> entries = (List<ZipEntry>) Collections.list(new ZipFile(zipfile).entries()); String targetDir = target + "/"; - for(ZipEntry entry : entries) { + for (ZipEntry entry : entries) { String name = entry.getName(); - if(name.startsWith(targetDir)) { + if (name.startsWith(targetDir)) { return true; } } @@ -266,23 +268,24 @@ public class ServerUtils { return Files.getOwner(FileSystems.getDefault().getPath(path)).getName(); } - public static String containerFilePath (String dir) { + public static String containerFilePath(String dir) { return dir + File.separator + "launch_container.sh"; } - public static String scriptFilePath (String dir) { + public static String scriptFilePath(String dir) { return dir + File.separator + "storm-worker-script.sh"; } /** * Writes a posix shell script file to be executed in its own process. - * @param dir the directory under which the script is to be written - * @param command the command the script is to execute + * + * @param dir the directory under which the script is to be written + * @param command the command the script is to execute * @param environment optional environment variables to set before running the script's command. May be null. * @return the path to the script that has been written */ public static String writeScript(String dir, List<String> command, - Map<String,String> environment) throws IOException { + Map<String, String> environment) throws IOException { String path = scriptFilePath(dir); try (BufferedWriter out = new BufferedWriter(new FileWriter(path))) { out.write("#!/bin/bash"); @@ -294,14 +297,14 @@ public class ServerUtils { v = ""; } out.write(shellCmd( - Arrays.asList( - "export",k+"="+v))); + Arrays.asList( + "export", k + "=" + v))); out.write(";"); out.newLine(); } } out.newLine(); - out.write("exec "+ shellCmd(command)+";"); + out.write("exec " + shellCmd(command) + ";"); } return path; } @@ -336,11 +339,11 @@ public class ServerUtils { } } - public static void killProcessWithSigTerm (String pid) throws IOException { + public static void killProcessWithSigTerm(String pid) throws IOException { sendSignalToProcess(Long.parseLong(pid), SIGTERM); } - public static void forceKillProcess (String pid) throws IOException { + public static void forceKillProcess(String pid) throws IOException { sendSignalToProcess(Long.parseLong(pid), SIGKILL); } @@ -370,14 +373,13 @@ public class ServerUtils { } /** - * Unpack matching files from a jar. Entries inside the jar that do - * not match the given pattern will be skipped. + * Unpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped. * * @param jarFile the .jar file to unpack - * @param toDir the destination directory into which to unpack the jar + * @param toDir the destination directory into which to unpack the jar */ public static void unJar(File jarFile, File toDir) - throws IOException { + throws IOException { JarFile jar = new JarFile(jarFile); try { Enumeration<JarEntry> entries = jar.entries(); @@ -407,13 +409,13 @@ public class ServerUtils { /** * Copies from one stream to another. * - * @param in InputStream to read from - * @param out OutputStream to write to + * @param in InputStream to read from + * @param out OutputStream to write to * @param buffSize the size of the buffer */ public static void copyBytes(InputStream in, OutputStream out, int buffSize) - throws IOException { - PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; + throws IOException { + PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); while (bytesRead >= 0) { @@ -433,13 +435,12 @@ public class ServerUtils { private static void ensureDirectory(File dir) throws IOException { if (!dir.mkdirs() && !dir.isDirectory()) { throw new IOException("Mkdirs failed to create " + - dir.toString()); + dir.toString()); } } /** - * Given a Tar File as input it will untar the file in a the untar directory - * passed as the second parameter + * Given a Tar File as input it will untar the file in a the untar directory passed as the second parameter * <p/> * This utility will untar ".tar" files and ".tar.gz","tgz" files. * @@ -484,13 +485,13 @@ public class ServerUtils { } else { untarCommand.append(inFile.toString()); } - String[] shellCmd = {"bash", "-c", untarCommand.toString()}; + String[] shellCmd = { "bash", "-c", untarCommand.toString() }; ShellUtils.ShellCommandExecutor shexec = new ShellUtils.ShellCommandExecutor(shellCmd); shexec.execute(); int exitcode = shexec.getExitCode(); if (exitcode != 0) { throw new IOException("Error untarring file " + inFile + - ". Tar process exited with exit code " + exitcode); + ". Tar process exited with exit code " + exitcode); } } @@ -500,7 +501,7 @@ public class ServerUtils { try { if (gzipped) { inputStream = new BufferedInputStream(new GZIPInputStream( - new FileInputStream(inFile))); + new FileInputStream(inFile))); } else { inputStream = new BufferedInputStream(new FileInputStream(inFile)); } @@ -511,7 +512,7 @@ public class ServerUtils { } } } finally { - if(inputStream != null) { + if (inputStream != null) { inputStream.close(); } } @@ -523,7 +524,7 @@ public class ServerUtils { File subDir = new File(outputDir, entry.getName()); if (!subDir.mkdirs() && !subDir.isDirectory()) { throw new IOException("Mkdirs failed to create tar internal dir " - + outputDir); + + outputDir); } for (TarArchiveEntry e : entry.getDirectoryEntries()) { unpackEntries(tis, e, subDir); @@ -540,7 +541,7 @@ public class ServerUtils { int count; byte data[] = new byte[2048]; BufferedOutputStream outputStream = new BufferedOutputStream( - new FileOutputStream(outputFile)); + new FileOutputStream(outputFile)); while ((count = tis.read(data)) != -1) { outputStream.write(data, 0, count); @@ -556,14 +557,14 @@ public class ServerUtils { } else if (lowerDst.endsWith(".zip")) { unZip(localrsrc, dst); } else if (lowerDst.endsWith(".tar.gz") || - lowerDst.endsWith(".tgz") || - lowerDst.endsWith(".tar")) { + lowerDst.endsWith(".tgz") || + lowerDst.endsWith(".tar")) { unTar(localrsrc, dst); } else { LOG.warn("Cannot unpack " + localrsrc); if (!localrsrc.renameTo(dst)) { throw new IOException("Unable to rename file: [" + localrsrc - + "] to [" + dst + "]"); + + "] to [" + dst + "]"); } } if (localrsrc.isFile()) { @@ -572,9 +573,9 @@ public class ServerUtils { } /** - * Given a File input it will unzip the file in a the unzip directory - * passed as the second parameter - * @param inFile The zip file as input + * Given a File input it will unzip the file in a the unzip directory passed as the second parameter + * + * @param inFile The zip file as input * @param unzipDir The unzip directory where to unzip the zip file. * @throws IOException */ @@ -617,14 +618,15 @@ public class ServerUtils { } /** - * Given a zip File input it will return its size - * Only works for zip files whose uncompressed size is less than 4 GB, - * otherwise returns the size module 2^32, per gzip specifications + * Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns + * the size module 2^32, per gzip specifications + * * @param myFile The zip file as input - * @throws IOException * @return zip file size as a long + * + * @throws IOException */ - public static long zipFileSize(File myFile) throws IOException{ + public static long zipFileSize(File myFile) throws IOException { RandomAccessFile raf = new RandomAccessFile(myFile, "r"); raf.seek(raf.length() - 4); long b4 = raf.read(); @@ -636,42 +638,6 @@ public class ServerUtils { return val; } - // Non-static impl methods exist for mocking purposes. - public String currentClasspathImpl() { - return System.getProperty("java.class.path"); - } - - public void extractDirFromJarImpl(String jarpath, String dir, File destdir) { - try (JarFile jarFile = new JarFile(jarpath)) { - Enumeration<JarEntry> jarEnums = jarFile.entries(); - while (jarEnums.hasMoreElements()) { - JarEntry entry = jarEnums.nextElement(); - if (!entry.isDirectory() && entry.getName().startsWith(dir)) { - File aFile = new File(destdir, entry.getName()); - aFile.getParentFile().mkdirs(); - try (FileOutputStream out = new FileOutputStream(aFile); - InputStream in = jarFile.getInputStream(entry)) { - IOUtils.copy(in, out); - } - } - } - } catch (IOException e) { - LOG.info("Could not extract {} from {}", dir, jarpath); - } - } - - public void downloadResourcesAsSupervisorImpl(String key, String localFile, - ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException { - final int MAX_RETRY_ATTEMPTS = 2; - final int ATTEMPTS_INTERVAL_TIME = 100; - for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) { - if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) { - break; - } - Utils.sleep(ATTEMPTS_INTERVAL_TIME); - } - } - private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) { boolean isSuccess = false; try (FileOutputStream out = new FileOutputStream(localFile); @@ -702,6 +668,7 @@ public class ServerUtils { /** * Check if the scheduler is resource aware or not. + * * @param conf The configuration * @return True if it's resource aware; false otherwise */ @@ -717,7 +684,7 @@ public class ServerUtils { public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException { return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) / - ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB))); + ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB))); } public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) @@ -725,13 +692,13 @@ public class ServerUtils { Map<String, Integer> componentParallelism = getComponentParallelism(topoConf, topology); double totalMemoryRequired = 0.0; - for (Map.Entry<String, NormalizedResourceRequest> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) { + for (Map.Entry<String, NormalizedResourceRequest> entry : ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) { int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1); double memoryRequirement = entry.getValue().getOnHeapMemoryMb(); totalMemoryRequired += memoryRequirement * parallelism; } - for (Map.Entry<String, NormalizedResourceRequest> entry: ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) { + for (Map.Entry<String, NormalizedResourceRequest> entry : ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) { int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1); double memoryRequirement = entry.getValue().getOnHeapMemoryMb(); totalMemoryRequired += memoryRequirement * parallelism; @@ -766,4 +733,40 @@ public class ServerUtils { sub.getPrincipals().add(principal); return sub; } + + // Non-static impl methods exist for mocking purposes. + public String currentClasspathImpl() { + return System.getProperty("java.class.path"); + } + + public void extractDirFromJarImpl(String jarpath, String dir, File destdir) { + try (JarFile jarFile = new JarFile(jarpath)) { + Enumeration<JarEntry> jarEnums = jarFile.entries(); + while (jarEnums.hasMoreElements()) { + JarEntry entry = jarEnums.nextElement(); + if (!entry.isDirectory() && entry.getName().startsWith(dir)) { + File aFile = new File(destdir, entry.getName()); + aFile.getParentFile().mkdirs(); + try (FileOutputStream out = new FileOutputStream(aFile); + InputStream in = jarFile.getInputStream(entry)) { + IOUtils.copy(in, out); + } + } + } + } catch (IOException e) { + LOG.info("Could not extract {} from {}", dir, jarpath); + } + } + + public void downloadResourcesAsSupervisorImpl(String key, String localFile, + ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException { + final int MAX_RETRY_ATTEMPTS = 2; + final int ATTEMPTS_INTERVAL_TIME = 100; + for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) { + if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) { + break; + } + Utils.sleep(ATTEMPTS_INTERVAL_TIME); + } + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/StormCommonInstaller.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/StormCommonInstaller.java b/storm-server/src/main/java/org/apache/storm/utils/StormCommonInstaller.java index a1b5596..6f29552 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/StormCommonInstaller.java +++ b/storm-server/src/main/java/org/apache/storm/utils/StormCommonInstaller.java @@ -1,19 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.utils; import org.apache.storm.daemon.StormCommon; @@ -37,7 +33,7 @@ public class StormCommonInstaller implements AutoCloseable { public void close() throws Exception { if (StormCommon.setInstance(_oldInstance) != _curInstance) { throw new IllegalStateException( - "Instances of this resource must be closed in reverse order of opening."); + "Instances of this resource must be closed in reverse order of opening."); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java b/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java index 96bce15..bf95a76 100644 --- a/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java +++ b/storm-server/src/main/java/org/apache/storm/zookeeper/AclEnforcement.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.zookeeper; @@ -73,7 +67,7 @@ public class AclEnforcement { List<ACL> drpcFullAcl = new ArrayList<>(2); drpcFullAcl.add(superUserAcl); - String drpcAclString = (String)conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL); + String drpcAclString = (String) conf.get(Config.STORM_ZOOKEEPER_DRPC_ACL); if (drpcAclString != null) { Id drpcAclId = Utils.parseZkId(drpcAclString, Config.STORM_ZOOKEEPER_DRPC_ACL); ACL drpcUserAcl = new ACL(ZooDefs.Perms.READ, drpcAclId); @@ -85,7 +79,7 @@ public class AclEnforcement { String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT); try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, port, "", - new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) { + new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) { if (zk.checkExists().forPath(stormRoot) != null) { //First off we want to verify that ROOT is good verifyAclStrict(zk, superAcl, stormRoot, fixUp); @@ -97,7 +91,7 @@ public class AclEnforcement { // Now that the root is fine we can start to look at the other paths under it. try (CuratorFramework zk = ClientZookeeper.mkClient(conf, zkServers, port, stormRoot, - new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) { + new DefaultWatcherCallBack(), conf, DaemonType.NIMBUS)) { //Next verify that the blob store is correct before we start it up. if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != null) { verifyAclStrictRecursive(zk, superAcl, ClusterUtils.BLOBSTORE_SUBTREE, fixUp); @@ -119,7 +113,7 @@ public class AclEnforcement { try { Subject nimbusSubject = new Subject(); nimbusSubject.getPrincipals().add(new NimbusPrincipal()); - for (String topoId: topoIds) { + for (String topoId : topoIds) { try { String blobKey = topoId + "-stormconf.ser"; Map<String, Object> topoConf = Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject)); @@ -210,7 +204,7 @@ public class AclEnforcement { } private static void verifyParentWithTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, - Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception { + Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception { if (zk.checkExists().forPath(path) != null) { verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp); Set<String> possiblyBadIds = new HashSet<>(); @@ -228,7 +222,7 @@ public class AclEnforcement { if (!possiblyBadIds.isEmpty()) { //Lets reread the children in STORMS as the source of truth and see if a new one was created in the background possiblyBadIds.removeAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE)); - for (String topoId: possiblyBadIds) { + for (String topoId : possiblyBadIds) { //Now we know for sure that this is a bad id String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId; zk.delete().deletingChildrenIfNeeded().forPath(childPath); @@ -238,12 +232,12 @@ public class AclEnforcement { } private static void verifyParentWithReadOnlyTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, - Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.READ); } private static void verifyParentWithReadWriteTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path, - Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { + Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception { verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.ALL); } @@ -271,7 +265,7 @@ public class AclEnforcement { private static void verifyAclStrictRecursive(CuratorFramework zk, List<ACL> strictAcl, String path, boolean fixUp) throws Exception { verifyAclStrict(zk, strictAcl, path, fixUp); - for (String child: zk.getChildren().forPath(path)) { + for (String child : zk.getChildren().forPath(path)) { String newPath = path + ClusterUtils.ZK_SEPERATOR + child; verifyAclStrictRecursive(zk, strictAcl, newPath, fixUp); } @@ -295,7 +289,7 @@ public class AclEnforcement { private static boolean equivalent(List<ACL> a, List<ACL> b) { if (a.size() == b.size()) { - for (ACL aAcl: a) { + for (ACL aAcl : a) { if (!b.contains(aAcl)) { return false; } @@ -305,10 +299,10 @@ public class AclEnforcement { return false; } - public static void main(String [] args) throws Exception { + public static void main(String[] args) throws Exception { Map<String, Object> conf = Utils.readStormConfig(); boolean fixUp = false; - for (String arg: args) { + for (String arg : args) { String a = arg.toLowerCase(); if ("-f".equals(a) || "--fixup".equals(a)) { fixUp = true; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java index 59313a0..2f2bb1f 100644 --- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java +++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java @@ -1,22 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.zookeeper; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; @@ -32,12 +32,6 @@ import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - public class LeaderElectorImp implements ILeaderElector { private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class); private final Map<String, Object> conf; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java index 0b401a5..c5497b5 100644 --- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java @@ -15,9 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.zookeeper; +import java.io.File; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; @@ -36,27 +46,17 @@ import org.apache.zookeeper.server.ZooKeeperServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; - public class Zookeeper { - private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class); - // A singleton instance allows us to mock delegated static methods in our // tests by subclassing. private static final Zookeeper INSTANCE = new Zookeeper(); + private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class); private static Zookeeper _instance = INSTANCE; /** - * Provide an instance of this class for delegates to use. To mock out - * delegated methods, provide an instance of a subclass that overrides the - * implementation of the delegated method. + * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * overrides the implementation of the delegated method. * * @param u a Zookeeper instance */ @@ -65,9 +65,8 @@ public class Zookeeper { } /** - * Resets the singleton instance to the default. This is helpful to reset - * the class to its original functionality when mocking is no longer - * desired. + * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no + * longer desired. */ public static void resetInstance() { _instance = INSTANCE; @@ -137,30 +136,35 @@ public class Zookeeper { /** * Get master leader elector. - * @param conf Config. - * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path. - * @param blobStore {@link BlobStore} - * @param tc {@link TopoCache} + * + * @param conf Config. + * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path. + * @param blobStore {@link BlobStore} + * @param tc {@link TopoCache} * @param clusterState {@link IStormClusterState} - * @param acls ACLs + * @param acls ACLs * @return Instance of {@link ILeaderElector} + * * @throws UnknownHostException */ public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore, - final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws UnknownHostException { + final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws + UnknownHostException { return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls); } protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore, - final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws UnknownHostException { + final TopoCache tc, IStormClusterState clusterState, List<ACL> acls) throws + UnknownHostException { List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); String leaderLockPath = "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference = - new AtomicReference<>(leaderLatchListenerImpl(new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls))); + new AtomicReference<>(leaderLatchListenerImpl( + new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls))); return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, - leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls); + leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java b/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java index 2900111..67e210d 100644 --- a/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java +++ b/storm-server/src/test/java/org/apache/storm/DaemonConfigTest.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm; import java.lang.reflect.InvocationTargetException; @@ -23,7 +18,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; - import org.apache.storm.validation.ConfigValidation; import org.junit.Assert; import org.junit.Test; @@ -37,11 +31,11 @@ public class DaemonConfigTest { passCases.add(null); passCases.add("some string"); - String[] stuff = {"some", "string", "list"}; + String[] stuff = { "some", "string", "list" }; passCases.add(Arrays.asList(stuff)); failCases.add(42); - Integer[] wrongStuff = {1, 2, 3}; + Integer[] wrongStuff = { 1, 2, 3 }; failCases.add(Arrays.asList(wrongStuff)); //worker.childopts validates @@ -61,32 +55,38 @@ public class DaemonConfigTest { } @Test - public void testNimbusChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testNimbusChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.NIMBUS_CHILDOPTS); } @Test - public void testLogviewerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testLogviewerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.LOGVIEWER_CHILDOPTS); } @Test - public void testUiChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testUiChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.UI_CHILDOPTS); } @Test - public void testPacemakerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testPacemakerChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.PACEMAKER_CHILDOPTS); } @Test - public void testDrpcChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testDrpcChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.DRPC_CHILDOPTS); } @Test - public void testSupervisorChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testSupervisorChildoptsIsStringOrStringList() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { stringOrStringListTest(DaemonConfig.SUPERVISOR_CHILDOPTS); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/LocalStateTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java index a4625a8..ade26ac 100644 --- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java +++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java @@ -1,42 +1,34 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.testing.TmpPath; import org.apache.storm.utils.LocalState; -import org.apache.thrift.TBase; import org.junit.Assert; import org.junit.Test; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - public class LocalStateTest { @Test - public void testLocalState() throws Exception{ - try (TmpPath dir1_tmp = new TmpPath(); TmpPath dir2_tmp = new TmpPath() ) { + public void testLocalState() throws Exception { + try (TmpPath dir1_tmp = new TmpPath(); TmpPath dir2_tmp = new TmpPath()) { GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a"); GlobalStreamId globalStreamId_b = new GlobalStreamId("b", "b"); GlobalStreamId globalStreamId_c = new GlobalStreamId("c", "c"); @@ -69,8 +61,8 @@ public class LocalStateTest { String dir = tmp_dir.getPath(); LocalState ls = new LocalState(dir); GlobalStreamId gs_a = new GlobalStreamId("a", "a"); - FileOutputStream data = FileUtils.openOutputStream(new File(dir,"12345")); - FileOutputStream version = FileUtils.openOutputStream(new File(dir,"12345.version")); + FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345")); + FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version")); Assert.assertNull(ls.get("c")); ls.put("a", gs_a); Assert.assertEquals(gs_a, ls.get("a")); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/MessagingTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java b/storm-server/src/test/java/org/apache/storm/MessagingTest.java index b0b6625..18cfc37 100644 --- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java +++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java @@ -1,47 +1,43 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.storm.generated.StormTopology; -import org.apache.storm.testing.TestWordSpout; -import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.CompleteTopologyParam; import org.apache.storm.testing.FixedTuple; import org.apache.storm.testing.MockedSources; -import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestWordSpout; import org.apache.storm.topology.TopologyBuilder; - import org.junit.Assert; import org.junit.Test; -import java.util.*; - public class MessagingTest { @Test public void testLocalTransport() throws Exception { Config topoConf = new Config(); topoConf.put(Config.TOPOLOGY_WORKERS, 2); - topoConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); + topoConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context"); try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime() - .withSupervisors(1).withPortsPerSupervisor(2) - .withDaemonConf(topoConf).build()) { + .withSupervisors(1).withPortsPerSupervisor(2) + .withDaemonConf(topoConf).build()) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("1", new TestWordSpout(true), 2); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/PacemakerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java index 7a00f77..5a9c130 100644 --- a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java +++ b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java @@ -1,22 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.generated.HBMessage; import org.apache.storm.generated.HBMessageData; import org.apache.storm.generated.HBPulse; @@ -26,9 +24,6 @@ import org.apache.storm.utils.Utils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; public class PacemakerTest { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/TestCgroups.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TestCgroups.java b/storm-server/src/test/java/org/apache/storm/TestCgroups.java index c785dde..874bd13 100644 --- a/storm-server/src/test/java/org/apache/storm/TestCgroups.java +++ b/storm-server/src/test/java/org/apache/storm/TestCgroups.java @@ -1,40 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; -import org.apache.storm.utils.Utils; -import org.junit.Assert; -import org.junit.Assume; -import org.apache.storm.container.cgroup.CgroupManager; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for CGroups @@ -52,7 +44,8 @@ public class TestCgroups { Config config = new Config(); config.putAll(Utils.readDefaultConfig()); //We don't want to run the test is CGroups are not setup - Assume.assumeTrue("Check if CGroups are setup", ((boolean) config.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE)) == true); + Assume + .assumeTrue("Check if CGroups are setup", ((boolean) config.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE)) == true); Assert.assertTrue("Check if STORM_CGROUP_HIERARCHY_DIR exists", stormCgroupHierarchyExists(config)); Assert.assertTrue("Check if STORM_SUPERVISOR_CGROUP_ROOTDIR exists", stormCgroupSupervisorRootDirExists(config)); @@ -69,13 +62,14 @@ public class TestCgroups { command.append(entry).append(" "); } String correctCommand1 = config.get(DaemonConfig.STORM_CGROUP_CGEXEC_CMD) + " -g memory,cpu:/" - + config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " "; + + config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " "; String correctCommand2 = config.get(DaemonConfig.STORM_CGROUP_CGEXEC_CMD) + " -g cpu,memory:/" - + config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " "; - Assert.assertTrue("Check if cgroup launch command is correct", command.toString().equals(correctCommand1) || command.toString().equals(correctCommand2)); + + config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR) + "/" + workerId + " "; + Assert.assertTrue("Check if cgroup launch command is correct", + command.toString().equals(correctCommand1) || command.toString().equals(correctCommand2)); String pathToWorkerCgroupDir = ((String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR)) - + "/" + ((String) config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR)) + "/" + workerId; + + "/" + ((String) config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR)) + "/" + workerId; Assert.assertTrue("Check if cgroup directory exists for worker", dirExists(pathToWorkerCgroupDir)); @@ -90,7 +84,8 @@ public class TestCgroups { String pathTomemoryLimitInBytes = pathToWorkerCgroupDir + "/memory.limit_in_bytes"; Assert.assertTrue("Check if memory.limit_in_bytes file exists", fileExists(pathTomemoryLimitInBytes)); - Assert.assertEquals("Check if the correct value is written into memory.limit_in_bytes", String.valueOf(1024 * 1024 * 1024), readFileAll(pathTomemoryLimitInBytes)); + Assert.assertEquals("Check if the correct value is written into memory.limit_in_bytes", String.valueOf(1024 * 1024 * 1024), + readFileAll(pathTomemoryLimitInBytes)); manager.releaseResourcesForWorker(workerId); @@ -104,7 +99,7 @@ public class TestCgroups { private boolean stormCgroupSupervisorRootDirExists(Map<String, Object> config) { String pathTostormCgroupSupervisorRootDir = ((String) config.get(Config.STORM_CGROUP_HIERARCHY_DIR)) - + "/" + ((String) config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR)); + + "/" + ((String) config.get(DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR)); return dirExists(pathTostormCgroupSupervisorRootDir); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/TestDaemonConfigValidate.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TestDaemonConfigValidate.java b/storm-server/src/test/java/org/apache/storm/TestDaemonConfigValidate.java index fd00ee9..a3e01bf 100644 --- a/storm-server/src/test/java/org/apache/storm/TestDaemonConfigValidate.java +++ b/storm-server/src/test/java/org/apache/storm/TestDaemonConfigValidate.java @@ -18,21 +18,21 @@ package org.apache.storm; -import org.apache.storm.validation.ConfigValidation; -import org.junit.Assert; -import org.junit.Test; - import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import org.apache.storm.validation.ConfigValidation; +import org.junit.Assert; +import org.junit.Test; public class TestDaemonConfigValidate { @Test - public void testSupervisorSchedulerMetaIsStringMap() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testSupervisorSchedulerMetaIsStringMap() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { Map<String, Object> conf = new HashMap<String, Object>(); Map<String, Object> schedulerMeta = new HashMap<String, Object>(); conf.put(DaemonConfig.SUPERVISOR_SCHEDULER_META, schedulerMeta); @@ -52,7 +52,8 @@ public class TestDaemonConfigValidate { } @Test - public void testIsolationSchedulerMachinesIsMap() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testIsolationSchedulerMachinesIsMap() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { Map<String, Object> conf = new HashMap<String, Object>(); Map<String, Integer> isolationMap = new HashMap<String, Integer>(); conf.put(DaemonConfig.ISOLATION_SCHEDULER_MACHINES, isolationMap); @@ -73,19 +74,20 @@ public class TestDaemonConfigValidate { } @Test - public void testSupervisorSlotsPorts() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException { + public void testSupervisorSlotsPorts() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, + InstantiationException, IllegalAccessException { Map<String, Object> conf = new HashMap<String, Object>(); Collection<Object> passCases = new LinkedList<Object>(); Collection<Object> failCases = new LinkedList<Object>(); - Integer[] test1 = {1233, 1234, 1235}; - Integer[] test2 = {1233}; + Integer[] test1 = { 1233, 1234, 1235 }; + Integer[] test2 = { 1233 }; passCases.add(Arrays.asList(test1)); passCases.add(Arrays.asList(test2)); - String[] test3 = {"1233", "1234", "1235"}; + String[] test3 = { "1233", "1234", "1235" }; //duplicate case - Integer[] test4 = {1233, 1233, 1235}; + Integer[] test4 = { 1233, 1233, 1235 }; failCases.add(test3); failCases.add(test4); failCases.add(null); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/TestRebalance.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TestRebalance.java b/storm-server/src/test/java/org/apache/storm/TestRebalance.java index 912ad42..6f9f076 100644 --- a/storm-server/src/test/java/org/apache/storm/TestRebalance.java +++ b/storm-server/src/test/java/org/apache/storm/TestRebalance.java @@ -1,23 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.RebalanceOptions; import org.apache.storm.generated.StormTopology; @@ -37,9 +33,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -49,6 +42,15 @@ public class TestRebalance { private static final Logger LOG = LoggerFactory.getLogger(TestRebalance.class); + public static String topoNameToId(String topoName, ILocalCluster cluster) throws TException { + for (TopologySummary topoSum : cluster.getClusterInfo().get_topologies()) { + if (topoSum.get_name().equals(topoName)) { + return topoSum.get_id(); + } + } + return null; + } + @Test public void testRebalanceTopologyResourcesAndConfigs() throws Exception { @@ -71,11 +73,11 @@ public class TestRebalance { TopologyBuilder builder = new TopologyBuilder(); SpoutDeclarer s1 = builder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(), - 2); + 2); BoltDeclarer b1 = builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(), - 2).shuffleGrouping("spout-1"); + 2).shuffleGrouping("spout-1"); BoltDeclarer b2 = builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(), - 2).shuffleGrouping("bolt-1"); + 2).shuffleGrouping("bolt-1"); StormTopology stormTopology = builder.createTopology(); @@ -173,13 +175,4 @@ public class TestRebalance { } return false; } - - public static String topoNameToId(String topoName, ILocalCluster cluster) throws TException { - for (TopologySummary topoSum : cluster.getClusterInfo().get_topologies()) { - if (topoSum.get_name().equals(topoName)) { - return topoSum.get_id(); - } - } - return null; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/TestingTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TestingTest.java b/storm-server/src/test/java/org/apache/storm/TestingTest.java index 201b470..ee0fabc 100644 --- a/storm-server/src/test/java/org/apache/storm/TestingTest.java +++ b/storm-server/src/test/java/org/apache/storm/TestingTest.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; @@ -37,7 +31,7 @@ import org.apache.storm.tuple.Values; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; /** * Test that the testing class does what it should do. @@ -48,18 +42,18 @@ public class TestingTest { TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("spout", new TestWordSpout(true), 3); tb.setBolt("2", new TestWordCounter(), 4) - .fieldsGrouping("spout", new Fields("word")); + .fieldsGrouping("spout", new Fields("word")); tb.setBolt("3", new TestGlobalCount()) - .globalGrouping("spout"); + .globalGrouping("spout"); tb.setBolt("4", new TestAggregatesCounter()) - .globalGrouping("2"); + .globalGrouping("2"); MockedSources mocked = new MockedSources(); mocked.addMockData("spout", - new Values("nathan"), - new Values("bob"), - new Values("joey"), - new Values("nathan")); + new Values("nathan"), + new Values("bob"), + new Values("joey"), + new Values("nathan")); Config topoConf = new Config(); topoConf.setNumWorkers(2); @@ -71,27 +65,27 @@ public class TestingTest { Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, tb.createTopology(), ctp); List<List<Object>> spoutTuples = Testing.readTuples(results, "spout"); List<List<Object>> expectedSpoutTuples = Arrays.asList(Arrays.asList("nathan"), Arrays.asList("bob"), Arrays.asList("joey"), - Arrays.asList("nathan")); + Arrays.asList("nathan")); assertTrue(expectedSpoutTuples + " expected, but found " + spoutTuples, - Testing.multiseteq(expectedSpoutTuples, spoutTuples)); + Testing.multiseteq(expectedSpoutTuples, spoutTuples)); List<List<Object>> twoTuples = Testing.readTuples(results, "2"); List<List<Object>> expectedTwoTuples = Arrays.asList(Arrays.asList("nathan", 1), Arrays.asList("nathan", 2), - Arrays.asList("bob", 1), Arrays.asList("joey", 1)); + Arrays.asList("bob", 1), Arrays.asList("joey", 1)); assertTrue(expectedTwoTuples + " expected, but found " + twoTuples, - Testing.multiseteq(expectedTwoTuples, twoTuples)); + Testing.multiseteq(expectedTwoTuples, twoTuples)); List<List<Object>> threeTuples = Testing.readTuples(results, "3"); List<List<Object>> expectedThreeTuples = Arrays.asList(Arrays.asList(1), Arrays.asList(2), - Arrays.asList(3), Arrays.asList(4)); + Arrays.asList(3), Arrays.asList(4)); assertTrue(expectedThreeTuples + " expected, but found " + threeTuples, - Testing.multiseteq(expectedThreeTuples, threeTuples)); + Testing.multiseteq(expectedThreeTuples, threeTuples)); List<List<Object>> fourTuples = Testing.readTuples(results, "4"); List<List<Object>> expectedFourTuples = Arrays.asList(Arrays.asList(1), Arrays.asList(2), - Arrays.asList(3), Arrays.asList(4)); + Arrays.asList(3), Arrays.asList(4)); assertTrue(expectedFourTuples + " expected, but found " + fourTuples, - Testing.multiseteq(expectedFourTuples, fourTuples)); + Testing.multiseteq(expectedFourTuples, fourTuples)); }; @Test http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/TickTupleTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java index 7f9f2b3..fdbcff5 100644 --- a/storm-server/src/test/java/org/apache/storm/TickTupleTest.java +++ b/storm-server/src/test/java/org/apache/storm/TickTupleTest.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm; @@ -39,7 +33,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class TickTupleTest { private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class); @@ -48,11 +43,11 @@ public class TickTupleTest { @Test public void testTickTupleWorksWithSystemBolt() throws Exception { - try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()){ + try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { StormTopology topology = createNoOpTopology(); Config topoConf = new Config(); topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); - try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) { + try (ILocalTopology topo = cluster.submitTopology("test", topoConf, topology)) { //Give the topology some time to come up long time = 0; int timeout = Math.max(Testing.TEST_TIMEOUT_MS, 100_000); @@ -65,13 +60,20 @@ public class TickTupleTest { for (int i = 0; i < 5; i++) { cluster.advanceClusterTime(1); time += 1_000; - assertEquals("Iteration " + i, (Long)time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS)); + assertEquals("Iteration " + i, (Long) time, tickTupleTimes.poll(100, TimeUnit.MILLISECONDS)); } } assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get()); } } + private StormTopology createNoOpTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("Spout", new NoopSpout()); + builder.setBolt("Bolt", new NoopBolt()).fieldsGrouping("Spout", new Fields("tuple")); + return builder.createTopology(); + } + private static class NoopSpout extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { @@ -111,11 +113,4 @@ public class TickTupleTest { @Override public void declareOutputFields(OutputFieldsDeclarer ofd) {} } - - private StormTopology createNoOpTopology() { - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("Spout", new NoopSpout()); - builder.setBolt("Bolt", new NoopBolt()).fieldsGrouping("Spout", new Fields("tuple")); - return builder.createTopology(); - } }
