[
https://issues.apache.org/jira/browse/STORM-885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15018598#comment-15018598
]
ASF GitHub Bot commented on STORM-885:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/838#discussion_r45507464
--- Diff: storm-core/src/clj/org/apache/storm/pacemaker/pacemaker.clj ---
@@ -0,0 +1,239 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.pacemaker.pacemaker
+ (:import [org.apache.storm.pacemaker PacemakerServer
IServerMessageHandler]
+ [java.util.concurrent ConcurrentHashMap ThreadPoolExecutor
TimeUnit LinkedBlockingDeque]
+ [java.util.concurrent.atomic AtomicInteger]
+ [java.util Date]
+ [backtype.storm.generated
+ HBAuthorizationException HBExecutionException HBNodes HBRecords
+ HBServerMessageType HBMessage HBMessageData HBPulse])
+ (:use [clojure.string :only [replace-first split]]
+ [backtype.storm log config util])
+ (:require [clojure.java.jmx :as jmx])
+ (:gen-class))
+
+
+;; Stats Functions
+
+(def sleep-seconds 60)
+
+
+(defn- check-and-set-loop [stats key new & {:keys [compare new-fn]
+ :or {compare (fn [new old]
true)
+ new-fn (fn [new old]
new)}}]
+ (loop []
+ (let [old (.get (key stats))
+ new (new-fn new old)]
+ (if (compare new old)
+ (if (.compareAndSet (key stats) old new)
+ nil
+ (recur))
+ nil))))
+
+(defn- set-average [stats size]
+ (check-and-set-loop
+ stats
+ :average-heartbeat-size
+ size
+ :new-fn (fn [new old]
+ (let [count (.get (:send-pulse-count stats))]
+ ; Weighted average
+ (/ (+ new (* count old)) (+ count 1))))))
+
+(defn- set-largest [stats size]
+ (check-and-set-loop
+ stats
+ :largest-heartbeat-size
+ size
+ :compare #'>))
+
+(defn- report-stats [heartbeats stats last-five-s]
+ (loop []
+ (let [send-count (.getAndSet (:send-pulse-count stats) 0)
+ received-size (.getAndSet (:total-received-size stats) 0)
+ get-count (.getAndSet (:get-pulse-count stats) 0)
+ sent-size (.getAndSet (:total-sent-size stats) 0)
+ largest (.getAndSet (:largest-heartbeat-size stats) 0)
+ average (.getAndSet (:average-heartbeat-size stats) 0)
+ total-keys (.size heartbeats)]
+ (log-debug "\nReceived " send-count " heartbeats totaling "
received-size " bytes,\n"
+ "Sent " get-count " heartbeats totaling " sent-size "
bytes,\n"
+ "The largest heartbeat was " largest " bytes,\n"
+ "The average heartbeat was " average " bytes,\n"
+ "Pacemaker contained " total-keys " total keys\n"
+ "in the last " sleep-seconds " second(s)")
+ (dosync (ref-set last-five-s
+ {:send-pulse-count send-count
+ :total-received-size received-size
+ :get-pulse-count get-count
+ :total-sent-size sent-size
+ :largest-heartbeat-size largest
+ :average-heartbeat-size average
+ :total-keys total-keys})))
+ (Thread/sleep (* 1000 sleep-seconds))
+ (recur)))
+
+;; JMX stuff
+(defn register [last-five-s]
+ (jmx/register-mbean
+ (jmx/create-bean
+ last-five-s)
+ "org.apache.storm.pacemaker.pacemaker:stats=Stats_Last_5_Seconds"))
+
+
+;; Pacemaker Functions
+
+(defn hb-data [conf]
+ (ConcurrentHashMap.))
+
+(defn create-path [^String path heartbeats]
+ (HBMessage. HBServerMessageType/CREATE_PATH_RESPONSE nil))
--- End diff --
Current implementation is a no-op. The parameters do make sense if we were
to change implementation in the future. We could leave these here or remove
them. I am fine with it either way.
> Heartbeat Server (Pacemaker)
> ----------------------------
>
> Key: STORM-885
> URL: https://issues.apache.org/jira/browse/STORM-885
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Kyle Nusbaum
>
> Large highly connected topologies and large clusters write a lot of data into
> ZooKeeper. The heartbeats, that make up the majority of this data, do not
> need to be persisted to disk. Pacemaker is intended to be a secure
> replacement for storing the heartbeats without changing anything within the
> heartbeats. In the future as more metrics are added in, we may want to look
> into switching it over to look more like Heron, where a metrics server is
> running for each node/topology. And can be used to aggregate/per-aggregate
> them in a more scalable manor.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)