http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java index 2e7ed02..94f2fed 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -25,7 +18,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; - import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.metricstore.AggLevel; @@ -48,9 +40,9 @@ import org.slf4j.LoggerFactory; public class RocksDbStore implements MetricStore, AutoCloseable { + static final int INVALID_METADATA_STRING_ID = 0; private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class); private static final int MAX_QUEUE_CAPACITY = 4000; - static final int INVALID_METADATA_STRING_ID = 0; RocksDB db; private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null; private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY); @@ -58,10 +50,6 @@ public class RocksDbStore implements MetricStore, AutoCloseable { private MetricsCleaner metricsCleaner = null; private Meter failureMeter = null; - interface RocksDbScanCallback { - boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan - } - /** * Create metric store instance using the configurations provided via the config map. * @@ -132,7 +120,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable { if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) { throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy " - + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING); + + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING); } // validate path defined @@ -147,17 +135,17 @@ public class RocksDbStore implements MetricStore, AutoCloseable { if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) { throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size " - + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY); + + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY); } if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) { throw new MetricException("Not a valid RocksDB configuration - Missing metric retention " - + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS); + + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS); } } private String getRocksDbAbsoluteDir(Map<String, Object> conf) throws MetricException { - String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION); + String storePath = (String) conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION); if (storePath == null) { throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION); } else { @@ -201,7 +189,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable { * Fill out the numeric values for a metric. * * @param metric Metric to populate - * @return true if the metric was populated, false otherwise + * @return true if the metric was populated, false otherwise * @throws MetricException if read from database fails */ @Override @@ -234,7 +222,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable { } RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), topologyId, metric.getTimestamp(), metricId, - componentId, executorId, hostId, metric.getPort(), streamId); + componentId, executorId, hostId, metric.getPort(), streamId); return populateFromKey(key, metric); } @@ -461,9 +449,9 @@ public class RocksDbStore implements MetricStore, AutoCloseable { for (AggLevel aggLevel : filter.getAggLevels()) { RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId, - startComponentId, startExecutorId, startHostId, startPort, startStreamId); + startComponentId, startExecutorId, startHostId, startPort, startStreamId); RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId, - endComponentId, endExecutorId, endHostId, endPort, endStreamId); + endComponentId, endExecutorId, endHostId, endPort, endStreamId); RocksIterator iterator = db.newIterator(ro); for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) { @@ -519,7 +507,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable { String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache); Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname, - streamId, key.getPort(), aggLevel); + streamId, key.getPort(), aggLevel); val.populateMetric(metric); @@ -561,7 +549,7 @@ public class RocksDbStore implements MetricStore, AutoCloseable { s = rdbValue.getMetdataString(); lookupCache.put(id, s); return s; - } catch (RocksDBException e) { + } catch (RocksDBException e) { if (this.failureMeter != null) { this.failureMeter.mark(); } @@ -635,5 +623,9 @@ public class RocksDbStore implements MetricStore, AutoCloseable { } } } + + interface RocksDbScanCallback { + boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java index 58b2c76..0da7b92 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -50,11 +43,11 @@ import org.apache.storm.metricstore.Metric; */ class RocksDbValue { - private static int METRIC_VALUE_SIZE = 41; - private byte[] value; private static final byte CURRENT_METADATA_VERSION = 0; private static final byte CURRENT_METRIC_VERSION = 0; + private static int METRIC_VALUE_SIZE = 41; private static int MIN_METADATA_VALUE_SIZE = 9; + private byte[] value; /** * Constructor from raw data. @@ -98,7 +91,7 @@ class RocksDbValue { /** * Get the metadata string portion of the value. Assumes the value is metadata. * - * @return the metadata string + * @return the metadata string */ String getMetdataString() { if (this.value.length < MIN_METADATA_VALUE_SIZE) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java index 6f54a58..d890582 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java index 7ce8435..d70076f 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; @@ -39,12 +32,23 @@ import org.slf4j.LoggerFactory; */ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, StringMetadata>, - WritableStringMetadataCache, ReadOnlyStringMetadataCache { + WritableStringMetadataCache, ReadOnlyStringMetadataCache { private static final Logger LOG = LoggerFactory.getLogger(StringMetadataCache.class); + private static StringMetadataCache instance = null; private Map<String, StringMetadata> lruStringCache; private Map<Integer, String> hashToString = new ConcurrentHashMap<>(); private RocksDbMetricsWriter dbWriter; - private static StringMetadataCache instance = null; + + /** + * Constructor to create a cache. + * + * @param dbWriter The rocks db writer instance the cache should use when evicting data + * @param capacity The cache size + */ + private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) { + lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this)); + this.dbWriter = dbWriter; + } /** * Initializes the cache instance. @@ -87,22 +91,15 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, } } - /** - * Constructor to create a cache. - * - * @param dbWriter The rocks db writer instance the cache should use when evicting data - * @param capacity The cache size - */ - private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) { - lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this)); - this.dbWriter = dbWriter; + static void cleanUp() { + instance = null; } /** * Get the string metadata from the cache. * * @param s The string to look for - * @return the metadata associated with the string or null if not found + * @return the metadata associated with the string or null if not found */ public StringMetadata get(String s) { return lruStringCache.get(s); @@ -169,7 +166,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, * Determines if a string Id is contained in the cache. * * @param stringId The string Id to check - * @return true if the Id is in the cache, false otherwise + * @return true if the Id is in the cache, false otherwise */ public boolean contains(Integer stringId) { return hashToString.containsKey(stringId); @@ -179,7 +176,7 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, * Returns the string matching the string Id if in the cache. * * @param stringId The string Id to check - * @return the associated string if the Id is in the cache, null otherwise + * @return the associated string if the Id is in the cache, null otherwise */ public String getMetadataString(Integer stringId) { return hashToString.get(stringId); @@ -188,15 +185,11 @@ public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, /** * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown. * - * @return the string metadata map entrySet + * @return the string metadata map entrySet */ public Set<Map.Entry<String, StringMetadata>> entrySet() { return lruStringCache.entrySet(); } - static void cleanUp() { - instance = null; - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java index 2d4165f..ab47fdf 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java @@ -1,26 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.metricstore.rocksdb; import java.util.Map; import java.util.Set; - import org.apache.http.annotation.NotThreadSafe; import org.apache.storm.metricstore.MetricException; @@ -48,7 +40,7 @@ public interface WritableStringMetadataCache extends ReadOnlyStringMetadataCache /** * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown. * - * @return the string metadata map entrySet + * @return the string metadata map entrySet */ Set<Map.Entry<String, StringMetadata>> entrySet(); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java index 558e570..b912513 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; @@ -27,11 +21,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.supervisor.Supervisor; import org.apache.storm.generated.SupervisorAssignments; -import org.apache.storm.scheduler.SupervisorDetails; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.SupervisorClient; @@ -95,6 +87,17 @@ public class AssignmentDistributionService implements Closeable { private boolean isLocalMode = false; // boolean cache for local mode decision /** + * Factory method for initialize a instance. + * @param conf config. + * @return an instance of {@link AssignmentDistributionService} + */ + public static AssignmentDistributionService getInstance(Map conf) { + AssignmentDistributionService service = new AssignmentDistributionService(); + service.prepare(conf); + return service; + } + + /** * Function for initialization. * * @param conf config @@ -163,6 +166,47 @@ public class AssignmentDistributionService implements Closeable { } } + public void addLocalSupervisor(Supervisor supervisor) { + this.localSupervisors.put(supervisor.getId(), supervisor); + } + + private Integer nextQueueId() { + return this.random.nextInt(threadsNum); + } + + private LinkedBlockingQueue<NodeAssignments> nextQueue() { + return this.assignmentsQueue.get(nextQueueId()); + } + + private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer queueIndex) { + return this.assignmentsQueue.get(queueIndex); + } + + /** + * Get an assignments from the target queue with the specific index. + * @param queueIndex index of the queue + * @return an {@link NodeAssignments} + * @throws InterruptedException + */ + public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException { + NodeAssignments target = null; + while (true) { + target = getQueueById(queueIndex).poll(); + if (target != null) { + return target; + } + Time.sleep(100L); + } + } + + public boolean isActive() { + return this.active; + } + + public Map getConf() { + return this.conf; + } + static class NodeAssignments { private String node; private String host; @@ -177,7 +221,7 @@ public class AssignmentDistributionService implements Closeable { } public static NodeAssignments getInstance(String node, String host, Integer serverPort, - SupervisorAssignments assignments) { + SupervisorAssignments assignments) { return new NodeAssignments(node, host, serverPort, assignments); } @@ -237,12 +281,12 @@ public class AssignmentDistributionService implements Closeable { supervisor.sendSupervisorAssignments(assignments.getAssignments()); } else { LOG.error("Can not find node {} for assignments distribution", assignments.getNode()); - throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance."); + throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance."); } } else { // distributed mode try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), - assignments.getHost(), assignments.getServerPort())){ + assignments.getHost(), assignments.getServerPort())) { try { client.getClient().sendSupervisorAssignments(assignments.getAssignments()); } catch (Exception e) { @@ -257,56 +301,4 @@ public class AssignmentDistributionService implements Closeable { } } } - - public void addLocalSupervisor(Supervisor supervisor) { - this.localSupervisors.put(supervisor.getId(), supervisor); - } - - private Integer nextQueueId() { - return this.random.nextInt(threadsNum); - } - - private LinkedBlockingQueue<NodeAssignments> nextQueue() { - return this.assignmentsQueue.get(nextQueueId()); - } - - private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer queueIndex) { - return this.assignmentsQueue.get(queueIndex); - } - - /** - * Get an assignments from the target queue with the specific index. - * @param queueIndex index of the queue - * @return an {@link NodeAssignments} - * @throws InterruptedException - */ - public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException { - NodeAssignments target = null; - while (true) { - target = getQueueById(queueIndex).poll(); - if (target != null) { - return target; - } - Time.sleep(100L); - } - } - - public boolean isActive() { - return this.active; - } - - public Map getConf() { - return this.conf; - } - - /** - * Factory method for initialize a instance. - * @param conf config. - * @return an instance of {@link AssignmentDistributionService} - */ - public static AssignmentDistributionService getInstance(Map conf) { - AssignmentDistributionService service = new AssignmentDistributionService(); - service.prepare(conf); - return service; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java index ab164ca..7fef7bf 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; @@ -30,7 +24,7 @@ public class DefaultTopologyValidator implements ITopologyValidator { private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override - public void prepare(Map<String, Object> StormConf){ + public void prepare(Map<String, Object> StormConf) { } @Override @@ -63,5 +57,5 @@ public class DefaultTopologyValidator implements ITopologyValidator { } } } - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java index e362fbc..05cdbc1 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.nimbus; import java.io.Closeable; @@ -62,7 +57,7 @@ public interface ILeaderElector extends Closeable { * Get list of current nimbus addresses. * @return list of current nimbus addresses, includes leader. */ - List<NimbusInfo> getAllNimbuses()throws Exception; + List<NimbusInfo> getAllNimbuses() throws Exception; /** * Method called to allow for cleanup. once close this object can not be reused. http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java index f36a4a8..8247a85 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.nimbus; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java index 46c4073..88d756d 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java @@ -1,30 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.nimbus; +import java.util.Map; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.generated.StormTopology; -import java.util.Map; public interface ITopologyValidator { void prepare(Map<String, Object> StormConf); void validate(String topologyName, Map<String, Object> topologyConf, StormTopology topology) - throws InvalidTopologyException; + throws InvalidTopologyException; } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java index bde58dc..8457de8 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java index 59cd462..68a2391 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java @@ -1,34 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; import com.google.common.base.Joiner; import com.google.common.collect.Sets; - -import javax.security.auth.Subject; import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; - +import javax.security.auth.Subject; import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; @@ -54,21 +46,17 @@ import org.slf4j.LoggerFactory; */ public class LeaderListenerCallback { private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class); - + private static final String STORM_JAR_SUFFIX = "-stormjar.jar"; + private static final String STORM_CODE_SUFFIX = "-stormcode.ser"; + private static final String STORM_CONF_SUFFIX = "-stormconf.ser"; private final BlobStore blobStore; private final TopoCache tc; private final IStormClusterState clusterState; - private final CuratorFramework zk; private final LeaderLatch leaderLatch; - private final Map conf; private final List<ACL> acls; - private static final String STORM_JAR_SUFFIX = "-stormjar.jar"; - private static final String STORM_CODE_SUFFIX = "-stormcode.ser"; - private static final String STORM_CONF_SUFFIX = "-stormconf.ser"; - /** * Constructor for {@LeaderListenerCallback}. * @param conf config @@ -103,7 +91,7 @@ public class LeaderListenerCallback { clusterState.setAssignmentsBackendSynchronized(); Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk, - ClusterUtils.STORMS_SUBTREE, false)); + ClusterUtils.STORMS_SUBTREE, false)); Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds); Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys); @@ -113,8 +101,8 @@ public class LeaderListenerCallback { // this finds all active topologies blob keys from all local topology blob keys Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys); LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", - generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys), - generateJoinedString(diffTopology)); + generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys), + generateJoinedString(diffTopology)); if (diffTopology.isEmpty()) { Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys); @@ -122,15 +110,15 @@ public class LeaderListenerCallback { // this finds all dependency blob keys from active topologies from all local blob keys Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys); LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", - generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys), - generateJoinedString(diffDependencies)); + generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys), + generateJoinedString(diffDependencies)); if (diffDependencies.isEmpty()) { LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally."); tc.clear(); } else { LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, " - + "giving up leadership."); + + "giving up leadership."); closeLatch(); } } else { @@ -174,8 +162,8 @@ public class LeaderListenerCallback { Set<String> topologyBlobKeys = new HashSet<>(); for (String blobKey : blobKeys) { if (blobKey.endsWith(STORM_JAR_SUFFIX) - || blobKey.endsWith(STORM_CODE_SUFFIX) - || blobKey.endsWith(STORM_CONF_SUFFIX)) { + || blobKey.endsWith(STORM_CODE_SUFFIX) + || blobKey.endsWith(STORM_CONF_SUFFIX)) { topologyBlobKeys.add(blobKey); } } @@ -209,12 +197,12 @@ public class LeaderListenerCallback { } } catch (AuthorizationException | KeyNotFoundException | IOException e) { LOG.error("Exception occurs while reading blob for key: " - + activeTopologyCodeKey - + ", exception: " - + e, e); + + activeTopologyCodeKey + + ", exception: " + + e, e); throw new RuntimeException("Exception occurs while reading blob for key: " - + activeTopologyCodeKey - + ", exception: " + e, e); + + activeTopologyCodeKey + + ", exception: " + e, e); } } return activeTopologyDependencies; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java index f725f5d..17d4d2f 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java @@ -1,33 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Random; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.storm.Config; import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.SupervisorWorkerHeartbeat; @@ -51,6 +44,8 @@ public class NimbusHeartbeatsPressureTest { private static int THREAD_SUBMIT_NUM = 1; private static int MOCKED_STORM_NUM = 5000; private static volatile boolean[] readyFlags = new boolean[THREADS_NUM]; + private static Random rand = new Random(47); + private static List<double[]> totalCostTimesBook = new ArrayList<>(); static { for (int i = 0; i < THREADS_NUM; i++) { @@ -58,9 +53,6 @@ public class NimbusHeartbeatsPressureTest { } } - private static Random rand = new Random(47); - private static List<double[]> totalCostTimesBook = new ArrayList<>(); - /** * Initialize a fake config. * @return conf @@ -131,6 +123,53 @@ public class NimbusHeartbeatsPressureTest { } } + private static void printTimeCostArray(Double[] array) { + StringBuilder builder = new StringBuilder(); + builder.append("["); + for (int i = 0; i < array.length; i++) { + if (i != array.length - 1) { + builder.append(array[i] + ","); + } else { + builder.append(array[i] + ""); + } + } + builder.append("]"); + System.out.println(builder.toString()); + } + + private static void printStatistics(List<double[]> data) { + + List<Double> totalPoints = new ArrayList<>(); + double total = 0D; + for (double[] item : data) { + for (Double point : item) { + if (point != null) { + totalPoints.add(point); + total += point; + } + } + } + Double[] totalPointsArray = new Double[totalPoints.size()]; + + totalPoints.toArray(totalPointsArray); + Arrays.sort(totalPointsArray); + // printTimeCostArray(totalPointsArray); + println("===== statistics ================"); + println("===== min time cost: " + totalPointsArray[0] + " ====="); + println("===== max time cost: " + totalPointsArray[totalPointsArray.length - 2] + " ====="); + + double meanVal = total / totalPointsArray.length; + println("===== mean time cost: " + meanVal + " ====="); + int middleIndex = (int) (totalPointsArray.length * 0.5); + println("===== median time cost: " + totalPointsArray[middleIndex] + " ====="); + int top90Index = (int) (totalPointsArray.length * 0.9); + println("===== top90 time cost: " + totalPointsArray[top90Index] + " ====="); + } + + public static void main(String[] args) { + testMaxThroughput(); + } + static class HeartbeatSendTask implements Runnable { private double[] runtimesBook; private int taskId; @@ -191,51 +230,4 @@ public class NimbusHeartbeatsPressureTest { } } - private static void printTimeCostArray(Double[] array) { - StringBuilder builder = new StringBuilder(); - builder.append("["); - for (int i = 0; i < array.length; i++) { - if (i != array.length - 1) { - builder.append(array[i] + ","); - } else { - builder.append(array[i] + ""); - } - } - builder.append("]"); - System.out.println(builder.toString()); - } - - private static void printStatistics(List<double[]> data) { - - List<Double> totalPoints = new ArrayList<>(); - double total = 0D; - for (double[] item : data) { - for (Double point : item) { - if (point != null) { - totalPoints.add(point); - total += point; - } - } - } - Double[] totalPointsArray = new Double[totalPoints.size()]; - - totalPoints.toArray(totalPointsArray); - Arrays.sort(totalPointsArray); - // printTimeCostArray(totalPointsArray); - println("===== statistics ================"); - println("===== min time cost: " + totalPointsArray[0] + " ====="); - println("===== max time cost: " + totalPointsArray[totalPointsArray.length - 2] + " ====="); - - double meanVal = total / totalPointsArray.length; - println("===== mean time cost: " + meanVal + " ====="); - int middleIndex = (int) (totalPointsArray.length * 0.5); - println("===== median time cost: " + totalPointsArray[middleIndex] + " ====="); - int top90Index = (int) (totalPointsArray.length * 0.9); - println("===== top90 time cost: " + totalPointsArray[top90Index] + " ====="); - } - - public static void main(String[] args) { - testMaxThroughput(); - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java index cdf22c7..b7e7117 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; @@ -30,7 +24,7 @@ public class StrictTopologyValidator implements ITopologyValidator { private static final Logger LOG = LoggerFactory.getLogger(StrictTopologyValidator.class); @Override - public void prepare(Map stormConf){ + public void prepare(Map stormConf) { } @Override @@ -63,5 +57,5 @@ public class StrictTopologyValidator implements ITopologyValidator { } } } - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java index 78f880a..ac01aae 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; @@ -21,11 +15,9 @@ package org.apache.storm.nimbus; import java.util.HashSet; import java.util.Map; import java.util.Set; - import org.apache.storm.Config; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java index 4befaf8..f656089 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.nimbus; @@ -37,10 +31,10 @@ public class WorkerHeartbeatsRecoveryStrategyFactory { IWorkerHeartbeatsRecoveryStrategy strategy; if (conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS) != null) { Object targetObj = ReflectionUtils.newInstance((String) - conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS)); + conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS)); Preconditions.checkState(targetObj instanceof IWorkerHeartbeatsRecoveryStrategy, - "{} must implements IWorkerHeartbeatsRecoveryStrategy", - DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS); + "{} must implements IWorkerHeartbeatsRecoveryStrategy", + DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS); strategy = ((IWorkerHeartbeatsRecoveryStrategy) targetObj); } else { strategy = new TimeOutWorkerHeartbeatsRecoveryStrategy(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java index 7dca237..a7ebc1e 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.pacemaker; import org.apache.storm.generated.HBMessage; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java index 095982b..ff283a4 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java @@ -1,29 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.pacemaker; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import java.util.ArrayList; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; - import org.apache.storm.generated.HBMessage; import org.apache.storm.generated.HBMessageData; import org.apache.storm.generated.HBNodes; @@ -35,79 +32,81 @@ import org.apache.storm.utils.Utils; import org.apache.storm.utils.VersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.codahale.metrics.ExponentiallyDecayingReservoir; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; - import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; public class Pacemaker implements IServerMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class); - private Map<String, byte[]> heartbeats; - private Map<String, Object> conf; - private final static Meter meterSendPulseCount = StormMetricsRegistry.registerMeter("pacemaker:send-pulse-count"); private final static Meter meterTotalReceivedSize = StormMetricsRegistry.registerMeter("pacemaker:total-receive-size"); private final static Meter meterGetPulseCount = StormMetricsRegistry.registerMeter("pacemaker:get-pulse=count"); private final static Meter meterTotalSentSize = StormMetricsRegistry.registerMeter("pacemaker:total-sent-size"); - private final static Histogram histogramHeartbeatSize = StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir()); + private final static Histogram histogramHeartbeatSize = + StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir()); + private Map<String, byte[]> heartbeats; + private Map<String, Object> conf; public Pacemaker(Map<String, Object> conf) { heartbeats = new ConcurrentHashMap(); this.conf = conf; StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", - new Callable() { - @Override - public Integer call() throws Exception { - return heartbeats.size(); - } - }); + new Callable() { + @Override + public Integer call() throws Exception { + return heartbeats.size(); + } + }); StormMetricsRegistry.startMetricsReporters(conf); } + public static void main(String[] args) { + SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); + Map<String, Object> conf = ConfigUtils.overrideLoginConfigWithSystemProperty(Utils.readStormConfig()); + final Pacemaker serverHandler = new Pacemaker(conf); + serverHandler.launchServer(); + } + @Override public HBMessage handleMessage(HBMessage m, boolean authenticated) { HBMessage response = null; HBMessageData data = m.get_data(); switch (m.get_type()) { - case CREATE_PATH: - response = createPath(data.get_path()); - break; - case EXISTS: - response = pathExists(data.get_path(), authenticated); - break; - case SEND_PULSE: - response = sendPulse(data.get_pulse()); - break; - case GET_ALL_PULSE_FOR_PATH: - response = getAllPulseForPath(data.get_path(), authenticated); - break; - case GET_ALL_NODES_FOR_PATH: - response = getAllNodesForPath(data.get_path(), authenticated); - break; - case GET_PULSE: - response = getPulse(data.get_path(), authenticated); - break; - case DELETE_PATH: - response = deletePath(data.get_path()); - break; - case DELETE_PULSE_ID: - response = deletePulseId(data.get_path()); - break; - default: - LOG.info("Got Unexpected Type: {}", m.get_type()); - break; + case CREATE_PATH: + response = createPath(data.get_path()); + break; + case EXISTS: + response = pathExists(data.get_path(), authenticated); + break; + case SEND_PULSE: + response = sendPulse(data.get_pulse()); + break; + case GET_ALL_PULSE_FOR_PATH: + response = getAllPulseForPath(data.get_path(), authenticated); + break; + case GET_ALL_NODES_FOR_PATH: + response = getAllNodesForPath(data.get_path(), authenticated); + break; + case GET_PULSE: + response = getPulse(data.get_path(), authenticated); + break; + case DELETE_PATH: + response = deletePath(data.get_path()); + break; + case DELETE_PULSE_ID: + response = deletePulseId(data.get_path()); + break; + default: + LOG.info("Got Unexpected Type: {}", m.get_type()); + break; } - if (response != null) + if (response != null) { response.set_message_id(m.get_message_id()); + } return response; } - private HBMessage createPath(String path) { return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null); } @@ -192,8 +191,9 @@ public class Pacemaker implements IServerMessageHandler { String prefix = path.endsWith("/") ? path : (path + "/"); for (String key : heartbeats.keySet()) { String checkKey = key + "/"; - if (checkKey.indexOf(prefix) == 0) + if (checkKey.indexOf(prefix) == 0) { deletePulseId(key); + } } return new HBMessage(HBServerMessageType.DELETE_PATH_RESPONSE, null); } @@ -209,11 +209,4 @@ public class Pacemaker implements IServerMessageHandler { return new PacemakerServer(this, conf); } - public static void main(String[] args) { - SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); - Map<String, Object> conf = ConfigUtils.overrideLoginConfigWithSystemProperty(Utils.readStormConfig()); - final Pacemaker serverHandler = new Pacemaker(conf); - serverHandler.launchServer(); - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java index 2f050db..e4e3c67 100644 --- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java +++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java @@ -1,36 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.pacemaker; -import org.apache.storm.Config; -import org.apache.storm.DaemonConfig; -import org.apache.storm.generated.HBMessage; -import org.apache.storm.messaging.netty.ISaslServer; -import org.apache.storm.messaging.netty.NettyRenameThreadFactory; -import org.apache.storm.security.auth.AuthUtils; -import java.lang.InterruptedException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import javax.security.auth.login.Configuration; +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.generated.HBMessage; +import org.apache.storm.messaging.netty.ISaslServer; +import org.apache.storm.messaging.netty.NettyRenameThreadFactory; import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec; +import org.apache.storm.security.auth.AuthUtils; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -55,48 +49,47 @@ class PacemakerServer implements ISaslServer { private ConcurrentSkipListSet<Channel> authenticated_channels = new ConcurrentSkipListSet<Channel>(); private ThriftNettyServerCodec.AuthMethod authMethod; - public PacemakerServer(IServerMessageHandler handler, Map<String, Object> config){ - int maxWorkers = (int)config.get(DaemonConfig.PACEMAKER_MAX_THREADS); - this.port = (int)config.get(Config.PACEMAKER_PORT); + public PacemakerServer(IServerMessageHandler handler, Map<String, Object> config) { + int maxWorkers = (int) config.get(DaemonConfig.PACEMAKER_MAX_THREADS); + this.port = (int) config.get(Config.PACEMAKER_PORT); this.handler = handler; this.topo_name = "pacemaker_server"; - String auth = (String)config.get(Config.PACEMAKER_AUTH_METHOD); - switch(auth) { - - case "DIGEST": - Configuration login_conf = AuthUtils.GetConfiguration(config); - authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST; - this.secret = AuthUtils.makeDigestPayload(login_conf, AuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST); - if(this.secret == null) { - LOG.error("Can't start pacemaker server without digest secret."); - throw new RuntimeException("Can't start pacemaker server without digest secret."); - } - break; - - case "KERBEROS": - authMethod = ThriftNettyServerCodec.AuthMethod.KERBEROS; - break; - - case "NONE": - authMethod = ThriftNettyServerCodec.AuthMethod.NONE; - break; - - default: - LOG.error("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD."); - throw new RuntimeException("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD."); + String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD); + switch (auth) { + + case "DIGEST": + Configuration login_conf = AuthUtils.GetConfiguration(config); + authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST; + this.secret = AuthUtils.makeDigestPayload(login_conf, AuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST); + if (this.secret == null) { + LOG.error("Can't start pacemaker server without digest secret."); + throw new RuntimeException("Can't start pacemaker server without digest secret."); + } + break; + + case "KERBEROS": + authMethod = ThriftNettyServerCodec.AuthMethod.KERBEROS; + break; + + case "NONE": + authMethod = ThriftNettyServerCodec.AuthMethod.NONE; + break; + + default: + LOG.error("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD."); + throw new RuntimeException("Can't start pacemaker server without proper PACEMAKER_AUTH_METHOD."); } ThreadFactory bossFactory = new NettyRenameThreadFactory("server-boss"); ThreadFactory workerFactory = new NettyRenameThreadFactory("server-worker"); NioServerSocketChannelFactory factory; - if(maxWorkers > 0) { + if (maxWorkers > 0) { factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers); - } - else { + } else { factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory)); @@ -106,10 +99,10 @@ class PacemakerServer implements ISaslServer { bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES); bootstrap.setOption("keepAlive", true); - int thriftMessageMaxSize = (Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX); + int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX); ChannelPipelineFactory pipelineFactory = new ThriftNettyServerCodec(this, config, authMethod, thriftMessageMaxSize) - .pipelineFactory(); + .pipelineFactory(); bootstrap.setPipelineFactory(pipelineFactory); Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); @@ -123,11 +116,10 @@ class PacemakerServer implements ISaslServer { public void cleanPipeline(Channel channel) { boolean authenticated = authenticated_channels.contains(channel); - if(!authenticated) { - if(channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) { + if (!authenticated) { + if (channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) != null) { channel.getPipeline().remove(ThriftNettyServerCodec.SASL_HANDLER); - } - else if(channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) { + } else if (channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) { channel.getPipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER); } } @@ -137,15 +129,14 @@ class PacemakerServer implements ISaslServer { cleanPipeline(channel); boolean authenticated = (authMethod == ThriftNettyServerCodec.AuthMethod.NONE) || authenticated_channels.contains(channel); - HBMessage m = (HBMessage)mesg; + HBMessage m = (HBMessage) mesg; LOG.debug("received message. Passing to handler. {} : {} : {}", handler.toString(), m.toString(), channel.toString()); HBMessage response = handler.handleMessage(m, authenticated); - if(response != null) { + if (response != null) { LOG.debug("Got Response from handler: {}", response); channel.write(response); - } - else { + } else { LOG.info("Got null response from handler handling message: {}", m); } }
