http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java index 2b0ca40..af9c681 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java @@ -36,16 +36,51 @@ import org.slf4j.LoggerFactory; public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy { private static final Logger LOG = LoggerFactory.getLogger(GenericResourceAwareStrategy.class); + /** + * Implementation of the sortObjectResources method so other strategies can reuse it. + */ + public static TreeSet<ObjectResources> sortObjectResourcesImpl( + final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, + final ExistingScheduleFunc existingScheduleFunc) { + AllResources affinityBasedAllResources = new AllResources(allResources); + + TreeSet<ObjectResources> sortedObjectResources = + new TreeSet<>((o1, o2) -> { + int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id); + int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id); + if (execsScheduled1 > execsScheduled2) { + return -1; + } else if (execsScheduled1 < execsScheduled2) { + return 1; + } else { + double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); + double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); + + if (o1Avg > o2Avg) { + return -1; + } else if (o1Avg < o2Avg) { + return 1; + } else { + return o1.id.compareTo(o2.id); + } + + } + }); + sortedObjectResources.addAll(affinityBasedAllResources.objectResources); + LOG.debug("Sorted Object Resources: {}", sortedObjectResources); + return sortedObjectResources; + } + @Override public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { prepare(cluster); if (nodes.getNodes().size() <= 0) { LOG.warn("No available nodes to schedule tasks on!"); return SchedulingResult.failure( - SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); + SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!"); } Collection<ExecutorDetails> unassignedExecutors = - new HashSet<>(this.cluster.getUnassignedExecutors(td)); + new HashSet<>(this.cluster.getUnassignedExecutors(td)); LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors); Collection<ExecutorDetails> scheduledTasks = new ArrayList<>(); List<Component> spouts = this.getSpouts(td); @@ -53,7 +88,7 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl if (spouts.size() == 0) { LOG.error("Cannot find a Spout!"); return SchedulingResult.failure( - SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); + SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!"); } //order executors to be scheduled @@ -64,10 +99,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl for (ExecutorDetails exec : orderedExecutors) { LOG.debug( - "Attempting to schedule: {} of component {}[ REQ {} ]", - exec, - td.getExecutorToComponent().get(exec), - td.getTaskResourceReqList(exec)); + "Attempting to schedule: {} of component {}[ REQ {} ]", + exec, + td.getExecutorToComponent().get(exec), + td.getTaskResourceReqList(exec)); final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes); scheduleExecutor(exec, td, scheduledTasks, sortedNodes); @@ -86,12 +121,12 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl if (executorsNotScheduled.size() > 0) { LOG.error("Not all executors successfully scheduled: {}", executorsNotScheduled); result = - SchedulingResult.failure( - SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, - (td.getExecutors().size() - unassignedExecutors.size()) - + "/" - + td.getExecutors().size() - + " executors scheduled"); + SchedulingResult.failure( + SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, + (td.getExecutors().size() - unassignedExecutors.size()) + + "/" + + td.getExecutors().size() + + " executors scheduled"); } else { LOG.debug("All resources successfully scheduled!"); result = SchedulingResult.success("Fully Scheduled by " + this.getClass().getSimpleName()); @@ -99,68 +134,28 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl return result; } - /** - * Sort objects by the following two criteria. 1) the number executors of the topology that needs - * to be scheduled is already on the object (node or rack) in descending order. The reasoning to - * sort based on criterion 1 is so we schedule the rest of a topology on the same object (node or - * rack) as the existing executors of the topology. 2) the subordinate/subservient resource - * availability percentage of a rack in descending order We calculate the resource availability - * percentage by dividing the resource availability of the object (node or rack) by the resource - * availability of the entire rack or cluster depending on if object references a node or a rack. - * How this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack - * if it is requested by the executor that the sorting is being done for and pulls it down if it is not. - * By doing this calculation, objects (node or rack) that have exhausted or little of one of the - * resources mentioned above will be ranked after racks that have more balanced resource - * availability and nodes or racks that have resources that are not requested will be ranked below - * . So we will be less likely to pick a rack that have a lot of one resource but a - * low amount of another and have a lot of resources that are not requested by the executor. + * Sort objects by the following two criteria. 1) the number executors of the topology that needs to be scheduled is already on the + * object (node or rack) in descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the + * same object (node or rack) as the existing executors of the topology. 2) the subordinate/subservient resource availability percentage + * of a rack in descending order We calculate the resource availability percentage by dividing the resource availability of the object + * (node or rack) by the resource availability of the entire rack or cluster depending on if object references a node or a rack. How + * this differs from the DefaultResourceAwareStrategy is that the percentage boosts the node or rack if it is requested by the executor + * that the sorting is being done for and pulls it down if it is not. By doing this calculation, objects (node or rack) that have + * exhausted or little of one of the resources mentioned above will be ranked after racks that have more balanced resource availability + * and nodes or racks that have resources that are not requested will be ranked below . So we will be less likely to pick a rack that + * have a lot of one resource but a low amount of another and have a lot of resources that are not requested by the executor. * - * @param allResources contains all individual ObjectResources as well as cumulative stats - * @param exec executor for which the sorting is done - * @param topologyDetails topologyDetails for the above executor + * @param allResources contains all individual ObjectResources as well as cumulative stats + * @param exec executor for which the sorting is done + * @param topologyDetails topologyDetails for the above executor * @param existingScheduleFunc a function to get existing executors already scheduled on this object * @return a sorted list of ObjectResources */ @Override protected TreeSet<ObjectResources> sortObjectResources( - final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, - final ExistingScheduleFunc existingScheduleFunc) { - return sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc); - } - - /** - * Implementation of the sortObjectResources method so other strategies can reuse it. - */ - public static TreeSet<ObjectResources> sortObjectResourcesImpl( final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, final ExistingScheduleFunc existingScheduleFunc) { - AllResources affinityBasedAllResources = new AllResources(allResources); - - TreeSet<ObjectResources> sortedObjectResources = - new TreeSet<>((o1, o2) -> { - int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id); - int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id); - if (execsScheduled1 > execsScheduled2) { - return -1; - } else if (execsScheduled1 < execsScheduled2) { - return 1; - } else { - double o1Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources); - double o2Avg = allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources); - - if (o1Avg > o2Avg) { - return -1; - } else if (o1Avg < o2Avg) { - return 1; - } else { - return o1.id.compareTo(o2.id); - } - - } - }); - sortedObjectResources.addAll(affinityBasedAllResources.objectResources); - LOG.debug("Sorted Object Resources: {}", sortedObjectResources); - return sortedObjectResources; + return sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java index 53859fd..2cdbb32 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.resource.strategies.scheduling; import java.util.Map; - import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.resource.SchedulingResult; @@ -29,13 +22,13 @@ import org.apache.storm.scheduler.resource.SchedulingResult; * In the future strategies will be pluggable */ public interface IStrategy { - + /** * Prepare the Strategy for scheduling. * @param config the cluster configuration */ void prepare(Map<String, Object> config); - + /** * This method is invoked to calculate a scheduling for topology td. Cluster will reject any changes that are * not for the given topology. Any changes made to the cluster will be committed if the scheduling is successful. http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java index 5d3dfbe..cfb6bb7 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; @@ -35,9 +29,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; -import org.apache.storm.Config; import org.apache.storm.DaemonConfig; -import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; @@ -84,7 +76,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader { } Integer thisPollTime = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS); if (thisPollTime != null) { - artifactoryPollTimeSecs =thisPollTime; + artifactoryPollTimeSecs = thisPollTime; } String thisBase = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY); if (thisBase != null) { @@ -97,7 +89,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader { try { targetURI = new URI(uriString); scheme = targetURI.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length()); - } catch(URISyntaxException e) { + } catch (URISyntaxException e) { LOG.error("Failed to parse uri={}", uriString); } } @@ -119,7 +111,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader { int currentTimeSecs = Time.currentTimeSecs(); if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) < artifactoryPollTimeSecs)) { LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; artifactoryPollTimeSecs: {}. Returning our last map.", - currentTimeSecs, lastReturnedTime, artifactoryPollTimeSecs); + currentTimeSecs, lastReturnedTime, artifactoryPollTimeSecs); return (Map<String, Object>) lastReturnedValue.get(configKey); } @@ -134,41 +126,6 @@ public class ArtifactoryConfigLoader implements IConfigLoader { return null; } - - /** - * A private class used to check the response coming back from httpclient. - */ - private static class GETStringResponseHandler implements ResponseHandler<String> { - private static GETStringResponseHandler singleton = null; - - /** - * @return a singleton httpclient GET response handler - */ - public static GETStringResponseHandler getInstance() { - if (singleton == null) { - singleton = new GETStringResponseHandler(); - } - return singleton; - } - - /** - * @param response The http response to verify. - * @return null on failure or the response string if return code is in 200 range - */ - @Override - public String handleResponse(final HttpResponse response) throws IOException { - int status = response.getStatusLine().getStatusCode(); - HttpEntity entity = response.getEntity(); - String entityString = (entity != null ? EntityUtils.toString(entity) : null); - if (status >= 200 && status < 300) { - return entityString; - } else { - LOG.error("Got unexpected response code {}; entity: {}", status, entityString); - return null; - } - } - } - /** * @param api null if we are trying to download artifact, otherwise a string to call REST api, * e.g. "/api/storage" @@ -232,13 +189,6 @@ public class ArtifactoryConfigLoader implements IConfigLoader { return returnValue; } - private class DirEntryCompare implements Comparator<JSONObject> { - @Override - public int compare(JSONObject o1, JSONObject o2) { - return ((String)o1.get("uri")).compareTo((String)o2.get("uri")); - } - } - private String loadMostRecentArtifact(String location, String host, Integer port) { // Is this a directory or is it a file? JSONObject json = getArtifactMetadata(location, host, port); @@ -256,7 +206,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader { return returnValue; } - // This should mean that we were pointed at a directory. + // This should mean that we were pointed at a directory. // Find the most recent child and load that. JSONArray msg = (JSONArray) json.get("children"); if (msg == null || msg.size() == 0) { @@ -336,13 +286,13 @@ public class ArtifactoryConfigLoader implements IConfigLoader { // First make the cache dir String localDirName = ServerConfigUtils.masterLocalDir(conf) + File.separator + LOCAL_ARTIFACT_DIR; File dir = new File(localDirName); - if (! dir.exists()) { + if (!dir.exists()) { dir.mkdirs(); } localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_"); dir = new File(localCacheDir); - if (! dir.exists()) { + if (!dir.exists()) { dir.mkdir(); } cacheInitialized = true; @@ -388,4 +338,45 @@ public class ArtifactoryConfigLoader implements IConfigLoader { return null; } + + /** + * A private class used to check the response coming back from httpclient. + */ + private static class GETStringResponseHandler implements ResponseHandler<String> { + private static GETStringResponseHandler singleton = null; + + /** + * @return a singleton httpclient GET response handler + */ + public static GETStringResponseHandler getInstance() { + if (singleton == null) { + singleton = new GETStringResponseHandler(); + } + return singleton; + } + + /** + * @param response The http response to verify. + * @return null on failure or the response string if return code is in 200 range + */ + @Override + public String handleResponse(final HttpResponse response) throws IOException { + int status = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + String entityString = (entity != null ? EntityUtils.toString(entity) : null); + if (status >= 200 && status < 300) { + return entityString; + } else { + LOG.error("Got unexpected response code {}; entity: {}", status, entityString); + return null; + } + } + } + + private class DirEntryCompare implements Comparator<JSONObject> { + @Override + public int compare(JSONObject o1, JSONObject o2) { + return ((String) o1.get("uri")).compareTo((String) o2.get("uri")); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java index 6494ae2..71b9eb5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java index 53df78b..4e1b3a5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java index a75e92b..8e4c9b9 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; @@ -21,7 +15,6 @@ package org.apache.storm.scheduler.utils; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; - import org.apache.storm.DaemonConfig; import org.apache.storm.utils.Utils; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java index 9a2ac5c..c9abd1b 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java index 5071133..6bfd6be 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; @@ -27,6 +21,6 @@ public interface IConfigLoader { * @param configKey The key from which we want to get the scheduler config. * @return The scheduler configs */ - Map<?,?> load(String configKey); + Map<?, ?> load(String configKey); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java index a039870..115c050 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.scheduler.utils; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java index dac0c91..8997e6f 100644 --- a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java +++ b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java @@ -43,44 +43,21 @@ public class WorkerTokenManager { private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenManager.class); /** - * The length of the random keys to use in bits. - * This should be at least the length of WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM. + * The length of the random keys to use in bits. This should be at least the length of WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM. */ private static final int KEY_LENGTH = 256; - - /** - * Generate a new random secret key. - * @return the new key - */ - protected SecretKey generateSecret() { - SecretKey key; - synchronized (keyGen) { - key = keyGen.generateKey(); - } - return key; - } - - /** - * Get the secret that should be used to sign a token. This may either reuse a secret or generate a new one so any user should - * call this once and save the result. - * @return the key to use. - */ - protected SecretKey getCurrentSecret() { - return generateSecret(); - } - /** * Key generator to use. */ private final KeyGenerator keyGen; private final IStormClusterState state; private final long tokenLifetimeMillis; - /** - * Constructor. This assumes that state can store the tokens securely, and that they should be enabled at all. - * Please use AuthUtils.areWorkerTokensEnabledServer to validate this first. + * Constructor. This assumes that state can store the tokens securely, and that they should be enabled at all. Please use + * AuthUtils.areWorkerTokensEnabledServer to validate this first. + * * @param daemonConf the config for nimbus. - * @param state the state used to store private keys. + * @param state the state used to store private keys. */ public WorkerTokenManager(Map<String, Object> daemonConf, IStormClusterState state) { this.state = state; @@ -91,15 +68,39 @@ public class WorkerTokenManager { throw new IllegalArgumentException("Can't find " + WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM + " algorithm."); } this.tokenLifetimeMillis = TimeUnit.MILLISECONDS.convert( - ObjectReader.getLong(daemonConf.get(DaemonConfig.STORM_WORKER_TOKEN_LIFE_TIME_HOURS),24L), + ObjectReader.getLong(daemonConf.get(DaemonConfig.STORM_WORKER_TOKEN_LIFE_TIME_HOURS), 24L), TimeUnit.HOURS); } /** + * Generate a new random secret key. + * + * @return the new key + */ + protected SecretKey generateSecret() { + SecretKey key; + synchronized (keyGen) { + key = keyGen.generateKey(); + } + return key; + } + + /** + * Get the secret that should be used to sign a token. This may either reuse a secret or generate a new one so any user should call + * this once and save the result. + * + * @return the key to use. + */ + protected SecretKey getCurrentSecret() { + return generateSecret(); + } + + /** * Create or update an existing key. + * * @param serviceType the type of service to create a token for - * @param user the user the token is for - * @param topologyId the topology the token is for + * @param user the user the token is for + * @param topologyId the topology the token is for * @return a newly generated token that should be good to start using form now until it expires. */ public WorkerToken createOrUpdateTokenFor(WorkerTokenServiceType serviceType, String user, String topologyId) { @@ -118,6 +119,7 @@ public class WorkerTokenManager { /** * Get the maximum expiration token time that should be renewed. + * * @return any token with an expiration less than the returned value should be renewed. */ public long getMaxExpirationTimeForRenewal() { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java b/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java index 861e21a..eafb315 100644 --- a/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java +++ b/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.testing; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.Testing; @@ -26,78 +20,78 @@ import org.apache.storm.Testing; * The param class for the `Testing.completeTopology`. */ public class CompleteTopologyParam { - /** - * The mocked spout sources - */ - private MockedSources mockedSources = new MockedSources(); - /** - * the config for the topology when it was submitted to the cluster - */ - private Map<String, Object> topoConf = new Config(); - /** - * whether cleanup the state? - */ - private boolean cleanupState = true; - /** - * the topology name you want to submit to the cluster - */ - private String topologyName; + /** + * The mocked spout sources + */ + private MockedSources mockedSources = new MockedSources(); + /** + * the config for the topology when it was submitted to the cluster + */ + private Map<String, Object> topoConf = new Config(); + /** + * whether cleanup the state? + */ + private boolean cleanupState = true; + /** + * the topology name you want to submit to the cluster + */ + private String topologyName; + + /** + * the timeout of topology you want to submit to the cluster + */ + private int timeoutMs = Testing.TEST_TIMEOUT_MS; - /** - * the timeout of topology you want to submit to the cluster - */ - private int timeoutMs = Testing.TEST_TIMEOUT_MS; + public MockedSources getMockedSources() { + return mockedSources; + } - public MockedSources getMockedSources() { - return mockedSources; - } + public void setMockedSources(MockedSources mockedSources) { + if (mockedSources == null) { + mockedSources = new MockedSources(); + } - public void setMockedSources(MockedSources mockedSources) { - if (mockedSources == null) { - mockedSources = new MockedSources(); - } - - this.mockedSources = mockedSources; - } + this.mockedSources = mockedSources; + } - public Map<String, Object> getStormConf() { - return topoConf; - } + public Map<String, Object> getStormConf() { + return topoConf; + } - public void setStormConf(Map<String, Object> topoConf) { - if (topoConf == null) { - topoConf = new Config(); - } - this.topoConf = topoConf; - } + public void setStormConf(Map<String, Object> topoConf) { + if (topoConf == null) { + topoConf = new Config(); + } + this.topoConf = topoConf; + } - public boolean getCleanupState() { - return cleanupState; - } + public boolean getCleanupState() { + return cleanupState; + } - public void setCleanupState(Boolean cleanupState) { - if (cleanupState == null) { - cleanupState = true; - } - this.cleanupState = cleanupState; - } + public void setCleanupState(Boolean cleanupState) { + if (cleanupState == null) { + cleanupState = true; + } + this.cleanupState = cleanupState; + } - public String getTopologyName() { - return topologyName; - } + public String getTopologyName() { + return topologyName; + } - public void setTopologyName(String topologyName) { - this.topologyName = topologyName; - } + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } - public Integer getTimeoutMs() { - return timeoutMs; - } + public Integer getTimeoutMs() { + return timeoutMs; + } - public void setTimeoutMs(Integer timeoutMs) { - if (timeoutMs == null) { - timeoutMs = Testing.TEST_TIMEOUT_MS; - } - this.timeoutMs = timeoutMs; - } + public void setTimeoutMs(Integer timeoutMs) { + if (timeoutMs == null) { + timeoutMs = Testing.TEST_TIMEOUT_MS; + } + this.timeoutMs = timeoutMs; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java b/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java index 032f09c..38db6da 100644 --- a/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java +++ b/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.testing; import java.util.List; - import org.apache.storm.zookeeper.Zookeeper; import org.apache.zookeeper.server.NIOServerCnxnFactory; @@ -30,11 +23,11 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory; * } */ public class InProcessZookeeper implements AutoCloseable { - + private final TmpPath zkTmp; private final NIOServerCnxnFactory zookeeper; private final long zkPort; - + public InProcessZookeeper() throws Exception { zkTmp = new TmpPath(); @SuppressWarnings("unchecked") @@ -42,7 +35,7 @@ public class InProcessZookeeper implements AutoCloseable { zkPort = (Long) portAndHandle.get(0); zookeeper = (NIOServerCnxnFactory) portAndHandle.get(1); } - + /** * @return the port ZK is listening on (localhost) */ @@ -55,5 +48,5 @@ public class InProcessZookeeper implements AutoCloseable { Zookeeper.shutdownInprocessZookeeper(zookeeper); zkTmp.close(); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/TestJob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/testing/TestJob.java b/storm-server/src/main/java/org/apache/storm/testing/TestJob.java index 79288ad..9c118e8 100644 --- a/storm-server/src/main/java/org/apache/storm/testing/TestJob.java +++ b/storm-server/src/main/java/org/apache/storm/testing/TestJob.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.testing; import org.apache.storm.ILocalCluster; @@ -33,11 +28,11 @@ import org.apache.storm.ILocalCluster; * ``` */ public interface TestJob { - /** - * run the testing logic with the cluster. - * - * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> - * and <code>Testing.withTrackedCluster</code>. - */ + /** + * run the testing logic with the cluster. + * + * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> + * and <code>Testing.withTrackedCluster</code>. + */ public void run(ILocalCluster cluster) throws Exception; } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java b/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java index 00bd1e7..1c6f738 100644 --- a/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java +++ b/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java @@ -1,29 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.testing; -import static org.apache.storm.Testing.whileTimeout; +package org.apache.storm.testing; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.storm.ILocalCluster; import org.apache.storm.Testing; import org.apache.storm.Thrift; @@ -37,6 +29,8 @@ import org.apache.storm.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.Testing.whileTimeout; + /** * A tracked topology keeps metrics for every bolt and spout. * This allows a test to know how many tuples have been fully processed. @@ -48,88 +42,88 @@ public class TrackedTopology { private final StormTopology topology; private final AtomicInteger lastSpoutCommit; private final ILocalCluster cluster; - + /** * Create a new topology to be tracked. * @param origTopo the original topology. * @param cluster a cluster that should have been launched with tracking enabled. */ - public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) { - LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId()); - this.cluster = cluster; - lastSpoutCommit = new AtomicInteger(0); - String id = cluster.getTrackedId(); - topology = origTopo.deepCopy(); - for (Bolt bolt: topology.get_bolts().values()) { - IRichBolt obj = (IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object()); - bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker(obj, id))); - } - for (SpoutSpec spout: topology.get_spouts().values()) { - IRichSpout obj = (IRichSpout) Thrift.deserializeComponentObject(spout.get_spout_object()); - spout.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker(obj, id))); - } - } + public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) { + LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId()); + this.cluster = cluster; + lastSpoutCommit = new AtomicInteger(0); + String id = cluster.getTrackedId(); + topology = origTopo.deepCopy(); + for (Bolt bolt : topology.get_bolts().values()) { + IRichBolt obj = (IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object()); + bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker(obj, id))); + } + for (SpoutSpec spout : topology.get_spouts().values()) { + IRichSpout obj = (IRichSpout) Thrift.deserializeComponentObject(spout.get_spout_object()); + spout.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker(obj, id))); + } + } + + @SuppressWarnings("unchecked") + private static int globalAmt(String id, String key) { + LOG.warn("Reading tracked metrics for ID {}", id); + return ((ConcurrentHashMap<String, AtomicInteger>) RegisteredGlobalState.getState(id)).get(key).get(); + } public StormTopology getTopology() { - return topology; + return topology; + } + + public ILocalCluster getCluster() { + return cluster; + } + + /** + * Wait for 1 tuple to be fully processed + */ + public void trackedWait() { + trackedWait(1, Testing.TEST_TIMEOUT_MS); + } + + /** + * Wait for amt tuples to be fully processed. + */ + public void trackedWait(int amt) { + trackedWait(amt, Testing.TEST_TIMEOUT_MS); + } + + /** + * Wait for amt tuples to be fully processed timeoutMs happens. + */ + public void trackedWait(int amt, int timeoutMs) { + final int target = amt + lastSpoutCommit.get(); + final String id = cluster.getTrackedId(); + Random rand = ThreadLocalRandom.current(); + whileTimeout(timeoutMs, + () -> { + int se = globalAmt(id, "spout-emitted"); + int transferred = globalAmt(id, "transferred"); + int processed = globalAmt(id, "processed"); + LOG.info("emitted {} target {} transferred {} processed {}", se, target, transferred, processed); + return (target != se) || (transferred != processed); + }, + () -> { + Time.advanceTimeSecs(1); + try { + Thread.sleep(rand.nextInt(200)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + lastSpoutCommit.set(target); } - - public ILocalCluster getCluster() { - return cluster; - } - - /** - * Wait for 1 tuple to be fully processed - */ - public void trackedWait() { - trackedWait(1, Testing.TEST_TIMEOUT_MS); - } - - /** - * Wait for amt tuples to be fully processed. - */ - public void trackedWait(int amt) { - trackedWait(amt, Testing.TEST_TIMEOUT_MS); - } - - /** - * Wait for amt tuples to be fully processed timeoutMs happens. - */ - public void trackedWait(int amt, int timeoutMs) { - final int target = amt + lastSpoutCommit.get(); - final String id = cluster.getTrackedId(); - Random rand = ThreadLocalRandom.current(); - whileTimeout(timeoutMs, - () -> { - int se = globalAmt(id, "spout-emitted"); - int transferred = globalAmt(id, "transferred"); - int processed = globalAmt(id, "processed"); - LOG.info("emitted {} target {} transferred {} processed {}", se, target, transferred, processed); - return (target != se) || (transferred != processed); - }, - () -> { - Time.advanceTimeSecs(1); - try { - Thread.sleep(rand.nextInt(200)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - lastSpoutCommit.set(target); - } - /** + /** * Read a metric from the tracked cluster (NOT JUST THIS TOPOLOGY) * @param key one of "spout-emitted", "processed", or "transferred" * @return the amount of that metric */ - public int globalAmt(String key) { - return globalAmt(cluster.getTrackedId(), key); - } - - @SuppressWarnings("unchecked") - private static int globalAmt(String id, String key) { - LOG.warn("Reading tracked metrics for ID {}", id); - return ((ConcurrentHashMap<String, AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get(); + public int globalAmt(String key) { + return globalAmt(cluster.getTrackedId(), key); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java b/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java index cd230c3..5bf91ca 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java +++ b/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.utils; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/LruMap.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java index 3ed5d06..ea0ea54 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java +++ b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.utils; @@ -40,7 +34,7 @@ public class LruMap<A, B> extends LinkedHashMap<A, B> { this(maxSize); this.evCb = evictionCallback; } - + @Override protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) { boolean evict = size() > this.maxSize; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java index dcf3d09..1df19cb 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java @@ -27,13 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; - import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.DaemonConfig; - public class ServerConfigUtils { public static final String FILE_SEPARATOR = File.separator; public static final String NIMBUS_DO_NOT_REASSIGN = "NIMBUS-DO-NOT-REASSIGN"; @@ -44,9 +42,9 @@ public class ServerConfigUtils { private static ServerConfigUtils _instance = new ServerConfigUtils(); /** - * Provide an instance of this class for delegates to use. To mock out - * delegated methods, provide an instance of a subclass that overrides the - * implementation of the delegated method. + * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * overrides the implementation of the delegated method. + * * @param u a ServerConfigUtils instance * @return the previously set instance */ @@ -85,8 +83,8 @@ public class ServerConfigUtils { /* TODO: make sure test these two functions in manual tests */ public static List<String> getTopoLogsUsers(Map<String, Object> topologyConf) { - List<String> logsUsers = (List<String>)topologyConf.get(DaemonConfig.LOGS_USERS); - List<String> topologyUsers = (List<String>)topologyConf.get(Config.TOPOLOGY_USERS); + List<String> logsUsers = (List<String>) topologyConf.get(DaemonConfig.LOGS_USERS); + List<String> topologyUsers = (List<String>) topologyConf.get(Config.TOPOLOGY_USERS); Set<String> mergedUsers = new HashSet<String>(); if (logsUsers != null) { for (String user : logsUsers) { @@ -108,8 +106,8 @@ public class ServerConfigUtils { } public static List<String> getTopoLogsGroups(Map<String, Object> topologyConf) { - List<String> logsGroups = (List<String>)topologyConf.get(DaemonConfig.LOGS_GROUPS); - List<String> topologyGroups = (List<String>)topologyConf.get(Config.TOPOLOGY_GROUPS); + List<String> logsGroups = (List<String>) topologyConf.get(DaemonConfig.LOGS_GROUPS); + List<String> topologyGroups = (List<String>) topologyConf.get(Config.TOPOLOGY_GROUPS); Set<String> mergedGroups = new HashSet<String>(); if (logsGroups != null) { for (String group : logsGroups) {
