http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java index 1521a96..b430f0b 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; @@ -22,7 +16,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.JarURLConnection; import java.net.URL; import java.nio.file.DirectoryStream; @@ -35,11 +28,9 @@ import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.storm.blobstore.ClientBlobStore; -import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.daemon.supervisor.AdvancedFSOps; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; @@ -54,69 +45,9 @@ import org.slf4j.LoggerFactory; * The version number of the blob's file will be stored in `${basename}.version` */ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { - private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedTopologyBlob.class); public static final long LOCAL_MODE_JAR_VERSION = 1; - - private static String resourcesJar() throws IOException { - String path = ServerUtils.currentClasspath(); - if (path == null) { - return null; - } - - for (String jpath : path.split(File.pathSeparator)) { - if (jpath.endsWith(".jar")) { - if (ServerUtils.zipDoesContainDir(jpath, ServerConfigUtils.RESOURCES_SUBDIR)) { - return jpath; - } - } - } - return null; - } - - public enum TopologyBlobType { - TOPO_JAR("stormjar.jar", "-stormjar.jar", "resources"), - TOPO_CODE("stormcode.ser", "-stormcode.ser", null), - TOPO_CONF("stormconf.ser", "-stormconf.ser", null); - - private final String fileName; - private final String keySuffix; - private final String extractionDir; - - TopologyBlobType(String fileName, String keySuffix, String extractionDir) { - this.fileName = fileName; - this.keySuffix = keySuffix; - this.extractionDir = extractionDir; - } - - public String getFileName() { - return fileName; - } - - public String getTempFileName(long version) { - return fileName + "." + version; - } - - public String getVersionFileName() { - return fileName + ".version"; - } - - public String getKey(String topologyId) { - return topologyId + keySuffix; - } - - public boolean needsExtraction() { - return extractionDir != null; - } - - public String getExtractionDir() { - return extractionDir; - } - - public String getTempExtractionDir(long version) { - return extractionDir + "." + version; - } - } - + private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedTopologyBlob.class); + private static final Pattern EXTRACT_BASE_NAME_AND_VERSION = Pattern.compile("^(.*)\\.([0-9]+)$"); private final TopologyBlobType type; private final String topologyId; private final boolean isLocalMode; @@ -124,7 +55,6 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { private final AdvancedFSOps fsOps; private volatile long version = NOT_DOWNLOADED_VERSION; private volatile long size = 0; - /** * Create a new LocallyCachedBlob. * @@ -143,6 +73,22 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { updateSizeOnDisk(); } + private static String resourcesJar() throws IOException { + String path = ServerUtils.currentClasspath(); + if (path == null) { + return null; + } + + for (String jpath : path.split(File.pathSeparator)) { + if (jpath.endsWith(".jar")) { + if (ServerUtils.zipDoesContainDir(jpath, ServerConfigUtils.RESOURCES_SUBDIR)) { + return jpath; + } + } + } + return null; + } + private void updateSizeOnDisk() throws IOException { long total = getSizeOnDisk(topologyBasicBlobsRootDir.resolve(type.getFileName())); if (type.needsExtraction()) { @@ -203,14 +149,14 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { long newVersion = downloadToTempLocation(store, type.getKey(topologyId), version, fsOps, - (version) -> topologyBasicBlobsRootDir.resolve(type.getTempFileName(version))); + (version) -> topologyBasicBlobsRootDir.resolve(type.getTempFileName(version))); Path tmpLocation = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion)); if (type.needsExtraction()) { Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion)); extractDirFromJar(tmpLocation.toAbsolutePath().toString(), ServerConfigUtils.RESOURCES_SUBDIR, - extractionDest); + extractionDest); } return newVersion; } @@ -295,16 +241,14 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { } } - private static final Pattern EXTRACT_BASE_NAME_AND_VERSION = Pattern.compile("^(.*)\\.([0-9]+)$"); - private void cleanUpTemp(String baseName) throws IOException { LOG.debug("Cleaning up temporary data in {}", topologyBasicBlobsRootDir); try (DirectoryStream<Path> children = fsOps.newDirectoryStream(topologyBasicBlobsRootDir, - (p) -> { - String fileName = p.getFileName().toString(); - Matcher m = EXTRACT_BASE_NAME_AND_VERSION.matcher(fileName); - return m.matches() && baseName.equals(m.group(1)); - })) { + (p) -> { + String fileName = p.getFileName().toString(); + Matcher m = EXTRACT_BASE_NAME_AND_VERSION.matcher(fileName); + return m.matches() && baseName.equals(m.group(1)); + })) { //children is only ever null if topologyBasicBlobsRootDir does not exist. This happens during unit tests // And because a non-existant directory is by definition clean we are ignoring it. if (children != null) { @@ -344,7 +288,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { @Override public boolean equals(Object other) { if (other instanceof LocallyCachedTopologyBlob) { - LocallyCachedTopologyBlob o = (LocallyCachedTopologyBlob)other; + LocallyCachedTopologyBlob o = (LocallyCachedTopologyBlob) other; return topologyId.equals(o.topologyId) && type == o.type && topologyBasicBlobsRootDir.equals(o.topologyBasicBlobsRootDir); } return false; @@ -359,4 +303,48 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob { public String toString() { return "LOCAL TOPO BLOB " + type + " " + topologyId; } + + public enum TopologyBlobType { + TOPO_JAR("stormjar.jar", "-stormjar.jar", "resources"), + TOPO_CODE("stormcode.ser", "-stormcode.ser", null), + TOPO_CONF("stormconf.ser", "-stormconf.ser", null); + + private final String fileName; + private final String keySuffix; + private final String extractionDir; + + TopologyBlobType(String fileName, String keySuffix, String extractionDir) { + this.fileName = fileName; + this.keySuffix = keySuffix; + this.extractionDir = extractionDir; + } + + public String getFileName() { + return fileName; + } + + public String getTempFileName(long version) { + return fileName + "." + version; + } + + public String getVersionFileName() { + return fileName + ".version"; + } + + public String getKey(String topologyId) { + return topologyId + keySuffix; + } + + public boolean needsExtraction() { + return extractionDir != null; + } + + public String getExtractionDir() { + return extractionDir; + } + + public String getTempExtractionDir(long version) { + return extractionDir + "." + version; + } + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java index bd92173..0979e03 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/PortAndAssignment.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.localizer; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/logging/ThriftAccessLogger.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/logging/ThriftAccessLogger.java b/storm-server/src/main/java/org/apache/storm/logging/ThriftAccessLogger.java index e326174..78ab822 100644 --- a/storm-server/src/main/java/org/apache/storm/logging/ThriftAccessLogger.java +++ b/storm-server/src/main/java/org/apache/storm/logging/ThriftAccessLogger.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.logging; @@ -29,7 +23,7 @@ public class ThriftAccessLogger { private static String accessLogBase = "Request ID: {} access from: {} principal: {} operation: {}"; public static void logAccessFunction(Integer requestId, InetAddress remoteAddress, - Principal principal, String operation, + Principal principal, String operation, String function) { String functionPart = ""; if (function != null) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java b/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java index de4cb72..5d7df0c 100644 --- a/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java +++ b/storm-server/src/main/java/org/apache/storm/logging/filters/AccessLoggingFilter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.logging.filters; @@ -27,7 +21,6 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +39,7 @@ public class AccessLoggingFilter implements Filter { public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException { if (request != null) { LOG.info("Access from: {} url: {} principal: {}", request.getRemoteAddr(), request.getRequestURL(), - (request.getUserPrincipal() == null ? "" : request.getUserPrincipal().getName())); + (request.getUserPrincipal() == null ? "" : request.getUserPrincipal().getName())); } chain.doFilter(request, response); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java b/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java index 2ee8db5..cfeb63c 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java +++ b/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java @@ -1,35 +1,29 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.metric; +import java.util.Collection; import org.apache.storm.metric.api.DataPoint; import org.apache.storm.metric.api.IClusterMetricsConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - public class ClusterMetricsConsumerExecutor { public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED = - "Preparation of Cluster Metrics Consumer failed. " + - "Please check your configuration and/or corresponding systems and relaunch Nimbus. " + - "Skipping handle metrics."; + "Preparation of Cluster Metrics Consumer failed. " + + "Please check your configuration and/or corresponding systems and relaunch Nimbus. " + + "Skipping handle metrics."; private IClusterMetricsConsumer metricsConsumer; private String consumerClassName; @@ -42,11 +36,11 @@ public class ClusterMetricsConsumerExecutor { public void prepare() { try { - metricsConsumer = (IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance(); + metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance(); metricsConsumer.prepare(registrationArgument); } catch (Exception e) { LOG.error("Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " + - consumerClassName, e); + consumerClassName, e); if (metricsConsumer != null) { metricsConsumer.cleanup(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java b/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java index db47d84..f07dc54 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java +++ b/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.metric; +import java.util.Collection; import org.apache.storm.metric.api.DataPoint; import org.apache.storm.metric.api.IClusterMetricsConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - /** * Listens for cluster related metrics, dumps them to log * @@ -48,7 +42,7 @@ public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", - clusterInfo.getTimestamp(), "<cluster>", "<cluster>"); + clusterInfo.getTimestamp(), "<cluster>", "<cluster>"); sb.append(header); logDataPoints(dataPoints, sb, header); } @@ -57,15 +51,15 @@ public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", - supervisorInfo.getTimestamp(), - supervisorInfo.getSrcSupervisorHost(), - supervisorInfo.getSrcSupervisorId()); + supervisorInfo.getTimestamp(), + supervisorInfo.getSrcSupervisorHost(), + supervisorInfo.getSrcSupervisorId()); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) - .append(padding).delete(header.length()+23,sb.length()).append("\t") - .append(p.getValue()); + .append(padding).delete(header.length() + 23, sb.length()).append("\t") + .append(p.getValue()); LOG.info(sb.toString()); } } @@ -78,8 +72,8 @@ public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) - .append(padding).delete(header.length()+23,sb.length()).append("\t") - .append(p.getValue()); + .append(padding).delete(header.length() + 23, sb.length()).append("\t") + .append(p.getValue()); LOG.info(sb.toString()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java index 28bba3e..3bc50e7 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java @@ -1,28 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metric; -import com.codahale.metrics.*; - +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reservoir; import java.util.Map; import java.util.concurrent.Callable; - import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; @@ -30,8 +27,8 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class StormMetricsRegistry { - private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); + private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); public static Meter registerMeter(String name) { Meter meter = new Meter(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metric/api/DataPoint.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/api/DataPoint.java b/storm-server/src/main/java/org/apache/storm/metric/api/DataPoint.java index 615dd5b..8468722 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/api/DataPoint.java +++ b/storm-server/src/main/java/org/apache/storm/metric/api/DataPoint.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.metric.api; public class DataPoint { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metric/api/IClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/api/IClusterMetricsConsumer.java b/storm-server/src/main/java/org/apache/storm/metric/api/IClusterMetricsConsumer.java index 39d60f3..a0b0a8d 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/api/IClusterMetricsConsumer.java +++ b/storm-server/src/main/java/org/apache/storm/metric/api/IClusterMetricsConsumer.java @@ -1,25 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.metric.api; import java.util.Collection; public interface IClusterMetricsConsumer { + void prepare(Object registrationArgument); + + void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints); + + void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints); + + void cleanup(); + class ClusterInfo { private long timestamp; @@ -55,9 +58,4 @@ public interface IClusterMetricsConsumer { return timestamp; } } - - void prepare(Object registrationArgument); - void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints); - void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints); - void cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java index 662a17c..d8beaa0 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -30,7 +23,7 @@ public enum AggLevel { private final byte value; AggLevel(int value) { - this.value = (byte)value; + this.value = (byte) value; } public byte getValue() { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java index 7cfbfbe..33d2097 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -39,32 +32,28 @@ public class FilterOptions { public FilterOptions() { } - public void setTopologyId(String topoId) { - this.topologyId = topoId; - } - public String getTopologyId() { return this.topologyId; } - public void setComponentId(String component) { - this.componentId = component; + public void setTopologyId(String topoId) { + this.topologyId = topoId; } public String getComponentId() { return this.componentId; } - public void setStartTime(Long time) { - this.startTime = time; + public void setComponentId(String component) { + this.componentId = component; } public long getStartTime() { return this.startTime; } - public void setEndTime(Long time) { - this.endTime = time; + public void setStartTime(Long time) { + this.startTime = time; } /** @@ -77,46 +66,50 @@ public class FilterOptions { return this.endTime; } - public void setMetricName(String name) { - this.metricName = name; + public void setEndTime(Long time) { + this.endTime = time; } public String getMetricName() { return this.metricName; } - public void setExecutorId(String id) { - this.executorId = id; + public void setMetricName(String name) { + this.metricName = name; } public String getExecutorId() { return this.executorId; } - public void setHostId(String id) { - this.hostId = id; + public void setExecutorId(String id) { + this.executorId = id; } public String getHostId() { return this.hostId; } - public void setPort(Integer p) { - this.port = p; + public void setHostId(String id) { + this.hostId = id; } public Integer getPort() { return this.port; } - public void setStreamId(String id) { - this.streamId = id; + public void setPort(Integer p) { + this.port = p; } public String getStreamId() { return this.streamId; } + public void setStreamId(String id) { + this.streamId = id; + } + /** * Add an aggregation level to search for. */ @@ -128,16 +121,6 @@ public class FilterOptions { } /** - * Set the aggregation levels to search for. - */ - public void setAggLevels(Set<AggLevel> levels) throws MetricException { - this.aggLevels = levels; - if (this.aggLevels == null || this.aggLevels.isEmpty()) { - throw new MetricException("Cannot search for empty AggLevel"); - } - } - - /** * Get the aggregation levels to search for. */ public Set<AggLevel> getAggLevels() { @@ -151,4 +134,14 @@ public class FilterOptions { } return this.aggLevels; } + + /** + * Set the aggregation levels to search for. + */ + public void setAggLevels(Set<AggLevel> levels) throws MetricException { + this.aggLevels = levels; + if (this.aggLevels == null || this.aggLevels.isEmpty()) { + throw new MetricException("Cannot search for empty AggLevel"); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java index 716ced0..19de781 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -22,7 +15,6 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,20 +92,20 @@ public class Metric implements Comparable<Metric> { Metric o = (Metric) other; return this == other - || (this.name.equals(o.getMetricName()) - && this.timestamp == o.getTimestamp() - && this.topologyId.equals(o.getTopologyId()) - && this.value == o.getValue() - && this.componentId.equals(o.getComponentId()) - && this.executorId.equals(o.getExecutorId()) - && this.hostname.equals(o.getHostname()) - && this.streamId.equals(o.getStreamId()) - && this.port == o.getPort() - && this.count == o.getCount() - && this.min == o.getMin() - && this.max == o.getMax() - && this.sum == o.getSum() - && this.aggLevel == o.getAggLevel()); + || (this.name.equals(o.getMetricName()) + && this.timestamp == o.getTimestamp() + && this.topologyId.equals(o.getTopologyId()) + && this.value == o.getValue() + && this.componentId.equals(o.getComponentId()) + && this.executorId.equals(o.getExecutorId()) + && this.hostname.equals(o.getHostname()) + && this.streamId.equals(o.getStreamId()) + && this.port == o.getPort() + && this.count == o.getCount() + && this.min == o.getMin() + && this.max == o.getMax() + && this.sum == o.getSum() + && this.aggLevel == o.getAggLevel()); } public AggLevel getAggLevel() { @@ -131,17 +123,6 @@ public class Metric implements Comparable<Metric> { } /** - * Initialize the metric value. - */ - public void setValue(double value) { - this.count = 1L; - this.min = value; - this.max = value; - this.sum = value; - this.value = value; - } - - /** * Adds an additional value to the metric. */ public void addValue(double value) { @@ -204,6 +185,17 @@ public class Metric implements Comparable<Metric> { return this.value; } + /** + * Initialize the metric value. + */ + public void setValue(double value) { + this.count = 1L; + this.min = value; + this.max = value; + this.sum = value; + this.value = value; + } + public String getMetricName() { return this.name; } @@ -259,12 +251,12 @@ public class Metric implements Comparable<Metric> { sb.append("|"); sb.append(this.streamId); return String.format("%s -- count: %d -- value: %f -- min: %f -- max: %f -- sum: %f", - sb.toString(), - this.count, - this.value, - this.min, - this.max, - this.sum); + sb.toString(), + this.count, + this.value, + this.min, + this.max, + this.sum); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java index e45a451..8236e7f 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java index d43e11b..71f53a5 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -42,7 +35,7 @@ public interface MetricStore extends AutoCloseable { * Fill out the numeric values for a metric. * * @param metric Metric to populate - * @return true if the metric was populated, false otherwise + * @return true if the metric was populated, false otherwise * @throws MetricException on error */ boolean populateValue(Metric metric) throws MetricException; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java index d66529a..42e5a7d 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -33,7 +26,7 @@ public class MetricStoreConfig { public static MetricStore configure(Map<String, Object> conf) throws MetricException { try { - String storeClass = (String)conf.get(DaemonConfig.STORM_METRIC_STORE_CLASS); + String storeClass = (String) conf.get(DaemonConfig.STORM_METRIC_STORE_CLASS); MetricStore store = (MetricStore) (Class.forName(storeClass)).newInstance(); store.prepare(conf); return store; @@ -51,7 +44,7 @@ public class MetricStoreConfig { public static WorkerMetricsProcessor configureMetricProcessor(Map conf) throws MetricException { try { - String processorClass = (String)conf.get(DaemonConfig.STORM_METRIC_PROCESSOR_CLASS); + String processorClass = (String) conf.get(DaemonConfig.STORM_METRIC_PROCESSOR_CLASS); WorkerMetricsProcessor processor = (WorkerMetricsProcessor) (Class.forName(processorClass)).newInstance(); processor.prepare(conf); return processor; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java index 0c98208..f3970b8 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; @@ -26,7 +19,7 @@ import org.apache.thrift.TException; /** * Implementation of WorkerMetricsProcessor that sends metric data to Nimbus for processing. */ -public class NimbusMetricProcessor implements WorkerMetricsProcessor { +public class NimbusMetricProcessor implements WorkerMetricsProcessor { @Override public void processWorkerMetrics(Map<String, Object> conf, WorkerMetrics metrics) throws MetricException { try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java index 98c1163..5a59473 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java index a351be7..8f65591 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -38,7 +31,6 @@ public enum KeyType { METADATA_STRING_END(7), METRIC_DATA(0x80); - private final byte value; private static Map<Byte, KeyType> MAP; static { @@ -49,12 +41,10 @@ public enum KeyType { MAP = Collections.unmodifiableMap(MAP); } - KeyType(int value) { - this.value = (byte)value; - } + private final byte value; - byte getValue() { - return this.value; + KeyType(int value) { + this.value = (byte) value; } static KeyType getKeyType(byte value) { @@ -65,6 +55,10 @@ public enum KeyType { return type; } } + + byte getValue() { + return this.value; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java index 6618f5d..07fe862 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java index 0effbc4..11d82b7 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -30,7 +23,7 @@ public interface ReadOnlyStringMetadataCache { * Get the string metadata from the cache. * * @param s The string to look for - * @return the metadata associated with the string or null if not found + * @return the metadata associated with the string or null if not found */ StringMetadata get(String s); @@ -38,7 +31,7 @@ public interface ReadOnlyStringMetadataCache { * Returns the string matching the string Id if in the cache. * * @param stringId The string Id to check - * @return the associated string if the Id is in the cache, null otherwise + * @return the associated string if the Id is in the cache, null otherwise */ String getMetadataString(Integer stringId); @@ -46,7 +39,7 @@ public interface ReadOnlyStringMetadataCache { * Determines if a string Id is contained in the cache. * * @param stringId The string Id to check - * @return true if the Id is in the cache, false otherwise + * @return true if the Id is in the cache, false otherwise */ boolean contains(Integer stringId); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java index 7868282..fffcaf1 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -51,10 +44,9 @@ import org.slf4j.LoggerFactory; * </pre> */ public class RocksDbKey implements Comparable<RocksDbKey> { - private static final Logger LOG = LoggerFactory.getLogger(RocksDbKey.class); static final int KEY_SIZE = 38; + private static final Logger LOG = LoggerFactory.getLogger(RocksDbKey.class); private static Map<Byte, RocksDbKey> PREFIX_MAP = new HashMap<>(); - private byte[] key; static { // pregenerate commonly used keys for scans @@ -65,6 +57,8 @@ public class RocksDbKey implements Comparable<RocksDbKey> { PREFIX_MAP = Collections.unmodifiableMap(PREFIX_MAP); } + private byte[] key; + /** * Constructor for a RocksDB key for a metadata string. * @@ -94,56 +88,16 @@ public class RocksDbKey implements Comparable<RocksDbKey> { * Get a zeroed key of the specified type. * * @param type the desired type - * @return a key of the desired type + * @return a key of the desired type */ static RocksDbKey getPrefix(KeyType type) { return PREFIX_MAP.get(type.getValue()); } /** - * get the metadata string Id portion of the key for metadata keys. - * - * @return the metadata string Id - * @throws RuntimeException if the key is not a metadata type - */ - int getMetadataStringId() { - if (this.getType().getValue() < KeyType.METADATA_STRING_END.getValue()) { - return ByteBuffer.wrap(key, 2, 4).getInt(); - } else { - throw new RuntimeException("Cannot fetch metadata string for key of type " + this.getType()); - } - } - - /** - * get the raw key bytes - */ - byte[] getRaw() { - return this.key; - } - - /** - * get the type of key. - * - * @return the type of key - */ - KeyType getType() { - return KeyType.getKeyType(key[0]); - } - - /** - * compares to keys on a byte by byte basis. - * - * @return comparison of key byte values - */ - @Override - public int compareTo(RocksDbKey o) { - return UnsignedBytes.lexicographicalComparator().compare(this.getRaw(), o.getRaw()); - } - - /** * gets the first possible key value for the desired key type. * - * @return the initial key + * @return the initial key */ static RocksDbKey getInitialKey(KeyType type) { return PREFIX_MAP.get(type.getValue()); @@ -152,17 +106,17 @@ public class RocksDbKey implements Comparable<RocksDbKey> { /** * gets the key just larger than the last possible key value for the desired key type. * - * @return the last key + * @return the last key */ static RocksDbKey getLastKey(KeyType type) { - byte value = (byte)(type.getValue() + 1); + byte value = (byte) (type.getValue() + 1); return PREFIX_MAP.get(value); } /** * Creates a metric key with the desired properties. * - * @return the generated key + * @return the generated key */ static RocksDbKey createMetricKey(AggLevel aggLevel, int topologyId, long metricTimestamp, int metricId, int componentId, int executorId, int hostId, int port, @@ -185,6 +139,46 @@ public class RocksDbKey implements Comparable<RocksDbKey> { } /** + * get the metadata string Id portion of the key for metadata keys. + * + * @return the metadata string Id + * @throws RuntimeException if the key is not a metadata type + */ + int getMetadataStringId() { + if (this.getType().getValue() < KeyType.METADATA_STRING_END.getValue()) { + return ByteBuffer.wrap(key, 2, 4).getInt(); + } else { + throw new RuntimeException("Cannot fetch metadata string for key of type " + this.getType()); + } + } + + /** + * get the raw key bytes + */ + byte[] getRaw() { + return this.key; + } + + /** + * get the type of key. + * + * @return the type of key + */ + KeyType getType() { + return KeyType.getKeyType(key[0]); + } + + /** + * compares to keys on a byte by byte basis. + * + * @return comparison of key byte values + */ + @Override + public int compareTo(RocksDbKey o) { + return UnsignedBytes.lexicographicalComparator().compare(this.getRaw(), o.getRaw()); + } + + /** * Get the unique string Id for a metric's topologyId. */ int getTopologyId() { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java index a050a76..7a30bcc 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java @@ -1,25 +1,17 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; import com.codahale.metrics.Meter; - import java.util.ArrayList; import java.util.HashSet; import java.util.ListIterator; @@ -28,7 +20,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; - import org.apache.http.annotation.NotThreadSafe; import org.apache.storm.metricstore.AggLevel; import org.apache.storm.metricstore.Metric; @@ -79,7 +70,7 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { * @param store The RocksDB store * @param queue The queue to receive metrics for insertion */ - RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) { + RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) { this.store = store; this.queue = queue; this.failureMeter = failureMeter; @@ -134,7 +125,7 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp); RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId, - componentId, executorId, hostId, metric.getPort(), streamId); + componentId, executorId, hostId, metric.getPort(), streamId); // save metric key/value to be batched RocksDbValue value = new RocksDbValue(metric); @@ -146,7 +137,7 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { ListIterator li = aggBuckets.listIterator(aggBuckets.size()); boolean populate = true; while (li.hasPrevious()) { - AggLevel bucket = (AggLevel)li.previous(); + AggLevel bucket = (AggLevel) li.previous(); Metric aggMetric = new Metric(metric); aggMetric.setAggLevel(bucket); @@ -155,7 +146,7 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { aggMetric.setTimestamp(roundedToBucket); RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId, - componentId, executorId, hostId, aggMetric.getPort(), streamId); + componentId, executorId, hostId, aggMetric.getPort(), streamId); if (populate) { // retrieve any existing aggregation matching this one and update the values @@ -217,7 +208,7 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { generateUniqueStringIds(); int id = unusedIds.iterator().next(); unusedIds.remove(id); - return id; + return id; } // guarantees a list of unused string Ids exists. Once the list is empty, creates a new list @@ -290,8 +281,8 @@ public class RocksDbMetricsWriter implements Runnable, AutoCloseable { // get all metadata from the cache to put into the database TreeMap<RocksDbKey, RocksDbValue> batchMap = new TreeMap<>(); // use a new map to prevent threading issues with writer thread for (Map.Entry entry : stringMetadataCache.entrySet()) { - String metadataString = (String)entry.getKey(); - StringMetadata val = (StringMetadata)entry.getValue(); + String metadataString = (String) entry.getKey(); + StringMetadata val = (StringMetadata) entry.getValue(); RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), metadataString); for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches
