[ https://issues.apache.org/jira/browse/STORM-1611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197786#comment-15197786 ]
ASF GitHub Bot commented on STORM-1611: --------------------------------------- Github user redsanket commented on a diff in the pull request: https://github.com/apache/storm/pull/1195#discussion_r56382441 --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java --- @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.pacemaker; + +import org.apache.storm.generated.*; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class Pacemaker implements IServerMessageHandler { + + private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class); + + private Map<String, byte[]> heartbeats; + private PacemakerStats pacemakerStats; + private Map conf; + private final long sleepSeconds = 60; + + private static class PacemakerStats { + public AtomicInteger sendPulseCount = new AtomicInteger(); + public AtomicInteger totalReceivedSize = new AtomicInteger(); + public AtomicInteger getPulseCount = new AtomicInteger(); + public AtomicInteger totalSentSize = new AtomicInteger(); + public AtomicInteger largestHeartbeatSize = new AtomicInteger(); + public AtomicInteger averageHeartbeatSize = new AtomicInteger(); + } + + public Pacemaker(Map conf) { + heartbeats = new ConcurrentHashMap(); + pacemakerStats = new PacemakerStats(); + this.conf = conf; + startStatsThread(); + } + + @Override + public HBMessage handleMessage(HBMessage m, boolean authenticated) { + HBMessage response = null; + HBMessageData data = m.get_data(); + switch (m.get_type()) { + case CREATE_PATH: + response = createPath(data.get_path()); + break; + case EXISTS: + response = exists(data.get_path(), authenticated); + break; + case SEND_PULSE: + response = sendPulse(data.get_pulse()); + break; + case GET_ALL_PULSE_FOR_PATH: + response = getAllPulseForPath(data.get_path(), authenticated); + break; + case GET_ALL_NODES_FOR_PATH: + response = getAllNodesForPath(data.get_path(), authenticated); + break; + case GET_PULSE: + response = getPulse(data.get_path(), authenticated); + break; + case DELETE_PATH: + response = deletePath(data.get_path()); + break; + case DELETE_PULSE_ID: + response = deletePulseId(data.get_path()); + break; + default: + LOG.info("Got Unexpected Type: {}", m.get_type()); + break; + } + if (response != null) + response.set_message_id(m.get_message_id()); + return response; + } + + private HBMessage createPath(String path) { + return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null); + } + + private HBMessage exists(String path, boolean authenticated) { + HBMessage response = null; + if (authenticated) { + boolean itDoes = heartbeats.containsKey(path); --- End diff -- can it be renamed to pathExists? > port org.apache.storm.pacemaker.pacemaker to java > ------------------------------------------------- > > Key: STORM-1611 > URL: https://issues.apache.org/jira/browse/STORM-1611 > Project: Apache Storm > Issue Type: New Feature > Reporter: John Fang > Assignee: John Fang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)