Repository: storm Updated Branches: refs/heads/master 58050a5b3 -> 53446108b
STORM-1255: port storm_utils.clj to java and split Time tests into its own test file Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a2a656ed Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a2a656ed Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a2a656ed Branch: refs/heads/master Commit: a2a656ed3fdbf76fddb730bced5bfe7f2b18df72 Parents: 4699990 Author: Alessandro Bellina <[email protected]> Authored: Mon Feb 15 13:30:10 2016 -0600 Committer: Alessandro Bellina <[email protected]> Committed: Tue Feb 16 23:17:42 2016 -0600 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/utils/Utils.java | 17 +- .../test/clj/org/apache/storm/utils_test.clj | 111 ---------- .../jvm/org/apache/storm/utils/TimeTest.java | 106 +++++++++ .../jvm/org/apache/storm/utils/UtilsTest.java | 221 +++++++++++++++++++ 4 files changed, 337 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a2a656ed/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 9a849ea..5674459 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -1058,6 +1058,10 @@ public class Utils { return newCurator(conf, servers, port, root, null); } + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { + return newCurator(conf, servers, port, "", auth); + } + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { List<String> serverPorts = new ArrayList<String>(); for (String zkServer : servers) { @@ -1113,10 +1117,6 @@ public class Utils { setupBuilder(builder, zkStr, conf, auth); } - public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { - return newCurator(conf, servers, port, "", auth); - } - public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { CuratorFramework ret = newCurator(conf, servers, port, root, auth); ret.start(); @@ -1397,13 +1397,16 @@ public class Utils { } if (memoryOpts != null) { int unit = 1; - if (memoryOpts.toLowerCase().endsWith("k")) { + memoryOpts = memoryOpts.toLowerCase(); + + if (memoryOpts.endsWith("k")) { unit = 1024; - } else if (memoryOpts.toLowerCase().endsWith("m")) { + } else if (memoryOpts.endsWith("m")) { unit = 1024 * 1024; - } else if (memoryOpts.toLowerCase().endsWith("g")) { + } else if (memoryOpts.endsWith("g")) { unit = 1024 * 1024 * 1024; } + memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0; return (result < 1.0) ? 1.0 : result; http://git-wip-us.apache.org/repos/asf/storm/blob/a2a656ed/storm-core/test/clj/org/apache/storm/utils_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/utils_test.clj b/storm-core/test/clj/org/apache/storm/utils_test.clj deleted file mode 100644 index 26442aa..0000000 --- a/storm-core/test/clj/org/apache/storm/utils_test.clj +++ /dev/null @@ -1,111 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.utils-test - (:import [org.apache.storm Config]) - (:import [org.apache.storm.utils NimbusClient Utils]) - (:import [org.apache.curator.retry ExponentialBackoffRetry]) - (:import [org.apache.thrift.transport TTransportException]) - (:import [org.apache.storm.utils ConfigUtils Time]) - (:use [org.apache.storm config util]) - (:use [clojure test]) -) - -(deftest test-new-curator-uses-exponential-backoff - (let [expected_interval 2400 - expected_retries 10 - expected_ceiling 3000 - conf (merge (clojurify-structure (Utils/readDefaultConfig)) - {Config/STORM_ZOOKEEPER_RETRY_INTERVAL expected_interval - Config/STORM_ZOOKEEPER_RETRY_TIMES expected_retries - Config/STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING expected_ceiling}) - servers ["bogus_server"] - arbitrary_port 42 - curator (Utils/newCurator conf servers arbitrary_port nil) - retry (-> curator .getZookeeperClient .getRetryPolicy) - ] - (is (.isAssignableFrom ExponentialBackoffRetry (.getClass retry))) - (is (= (.getBaseSleepTimeMs retry) expected_interval)) - (is (= (.getN retry) expected_retries)) - (is (= (.getSleepTimeMs retry 10 0) expected_ceiling)) - ) -) - -(deftest test-getConfiguredClient-throws-RunTimeException-on-bad-args - (let [storm-conf (merge - (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-NIMBUS-RETRY-TIMES 0})] - (is (thrown-cause? TTransportException - (NimbusClient. storm-conf "" 65535) - )) - ) -) - -(deftest test-isZkAuthenticationConfiguredTopology - (testing "Returns false on null config" - (is (not (Utils/isZkAuthenticationConfiguredTopology nil)))) - (testing "Returns false on scheme key missing" - (is (not (Utils/isZkAuthenticationConfiguredTopology - {STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME nil})))) - (testing "Returns false on scheme value null" - (is (not - (Utils/isZkAuthenticationConfiguredTopology - {STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME nil})))) - (testing "Returns true when scheme set to string" - (is - (Utils/isZkAuthenticationConfiguredTopology - {STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME "foobar"})))) - -(deftest test-isZkAuthenticationConfiguredStormServer - (let [k "java.security.auth.login.config" - oldprop (System/getProperty k)] - (try - (.remove (System/getProperties) k) - (testing "Returns false on null config" - (is (not (Utils/isZkAuthenticationConfiguredStormServer nil)))) - (testing "Returns false on scheme key missing" - (is (not (Utils/isZkAuthenticationConfiguredStormServer - {STORM-ZOOKEEPER-AUTH-SCHEME nil})))) - (testing "Returns false on scheme value null" - (is (not - (Utils/isZkAuthenticationConfiguredStormServer - {STORM-ZOOKEEPER-AUTH-SCHEME nil})))) - (testing "Returns true when scheme set to string" - (is - (Utils/isZkAuthenticationConfiguredStormServer - {STORM-ZOOKEEPER-AUTH-SCHEME "foobar"}))) - (testing "Returns true when java.security.auth.login.config is set" - (do - (System/setProperty k "anything") - (is (Utils/isZkAuthenticationConfiguredStormServer {})))) - (testing "Returns false when java.security.auth.login.config is set" - (do - (System/setProperty k "anything") - (is (Utils/isZkAuthenticationConfiguredStormServer {})))) - (finally - (if (not-nil? oldprop) - (System/setProperty k oldprop) - (.remove (System/getProperties) k)))))) - -(deftest test-secs-to-millis-long - (is (= 0 (Time/secsToMillisLong 0))) - (is (= 2 (Time/secsToMillisLong 0.002))) - (is (= 500 (Time/secsToMillisLong 0.5))) - (is (= 1000 (Time/secsToMillisLong 1))) - (is (= 1080 (Time/secsToMillisLong 1.08))) - (is (= 10000 (Time/secsToMillisLong 10))) - (is (= 10100 (Time/secsToMillisLong 10.1))) -) - http://git-wip-us.apache.org/repos/asf/storm/blob/a2a656ed/storm-core/test/jvm/org/apache/storm/utils/TimeTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/TimeTest.java b/storm-core/test/jvm/org/apache/storm/utils/TimeTest.java new file mode 100644 index 0000000..faf75eb --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/utils/TimeTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import org.junit.Test; +import org.junit.Assert; + +public class TimeTest{ + + @Test + public void secsToMillisLongTest(){ + Assert.assertEquals(Time.secsToMillisLong(0), 0); + Assert.assertEquals(Time.secsToMillisLong(0.002), 2); + Assert.assertEquals(Time.secsToMillisLong(1), 1000); + Assert.assertEquals(Time.secsToMillisLong(1.08), 1080); + Assert.assertEquals(Time.secsToMillisLong(10), 10000); + Assert.assertEquals(Time.secsToMillisLong(10.1), 10100); + } + + @Test + public void ifNotSimulatingIsSimulatingReturnsFalse(){ + Assert.assertFalse(Time.isSimulating()); + } + + @Test + public void ifSimulatingIsSimulatingReturnsTrue(){ + Time.startSimulating(); + Assert.assertTrue(Time.isSimulating()); + Time.stopSimulating(); + } + + @Test + public void advanceTimeSimulatedTimeBy0Causes0DeltaTest(){ + Time.startSimulating(); + long current = Time.currentTimeMillis(); + Time.advanceTime(0); + Assert.assertEquals(Time.deltaMs(current), 0); + Time.stopSimulating(); + } + + @Test + public void advanceTimeSimulatedTimeBy1000Causes1000MsDeltaTest(){ + Time.startSimulating(); + long current = Time.currentTimeMillis(); + Time.advanceTime(1000); + Assert.assertEquals(Time.deltaMs(current), 1000); + Time.stopSimulating(); + } + + @Test + public void advanceTimeSimulatedTimeBy1500Causes1500MsDeltaTest(){ + Time.startSimulating(); + long current = Time.currentTimeMillis(); + Time.advanceTime(1500); + Assert.assertEquals(Time.deltaMs(current), 1500); + Time.stopSimulating(); + } + + @Test + public void advanceTimeSimulatedTimeByNegative1500CausesNegative1500MsDeltaTest(){ + Time.startSimulating(); + long current = Time.currentTimeMillis(); + Time.advanceTime(-1500); + Assert.assertEquals(Time.deltaMs(current), -1500); + Time.stopSimulating(); + } + + @Test + public void advanceSimulatedTimeBy1000MsSecondReturns1SecondTest(){ + Time.startSimulating(); + int current = Time.currentTimeSecs(); + Time.advanceTime(1000); + Assert.assertEquals(Time.deltaSecs(current), 1); + Time.stopSimulating(); + } + + @Test + public void advanceSimulatedtimeBy1500MsSecondsReturns1TruncatedSecondTest(){ + Time.startSimulating(); + int current = Time.currentTimeSecs(); + Time.advanceTime(1500); + Assert.assertEquals(Time.deltaSecs(current), 1, 0); + Time.stopSimulating(); + } + + @Test(expected=IllegalStateException.class) + public void ifNotSimulatingAdvanceTimeThrows(){ + Time.advanceTime(1000); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/a2a656ed/storm-core/test/jvm/org/apache/storm/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-core/test/jvm/org/apache/storm/utils/UtilsTest.java new file mode 100644 index 0000000..1bb5f71 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/utils/UtilsTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import org.junit.Test; +import org.junit.Assert; + +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; + +import org.apache.storm.Config; +import org.apache.thrift.transport.TTransportException; + +import static org.mockito.Mockito.*; + +public class UtilsTest{ + @Test + public void newCuratorUsesExponentialBackoffTest() throws InterruptedException{ + final int expectedInterval = 2400; + final int expectedRetries = 10; + final int expectedCeiling = 3000; + + Map<String, Object> config = Utils.readDefaultConfig(); + config.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, expectedInterval); + config.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, expectedRetries); + config.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, expectedCeiling); + + CuratorFramework curator = Utils.newCurator(config, Arrays.asList("bogus_server"), 42 /*port*/, ""); + StormBoundedExponentialBackoffRetry policy = + (StormBoundedExponentialBackoffRetry) curator.getZookeeperClient().getRetryPolicy(); + Assert.assertEquals(policy.getBaseSleepTimeMs(), expectedInterval); + Assert.assertEquals(policy.getN(), expectedRetries); + Assert.assertEquals(policy.getSleepTimeMs(10, 0), expectedCeiling); + } + + @Test(expected = RuntimeException.class) + public void getConfiguredClientThrowsRuntimeExceptionOnBadArgsTest () throws RuntimeException, TTransportException { + Map config = ConfigUtils.readStormConfig(); + config.put(Config.STORM_NIMBUS_RETRY_TIMES, 0); + new NimbusClient(config, "", 65535); + } + + private Map mockMap(String key, String value){ + Map<String, Object> map = new HashMap<String, Object>(); + map.put(key, value); + return map; + } + + private Map topologyMockMap(String value){ + return mockMap(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, value); + } + + private Map serverMockMap(String value){ + return mockMap(Config.STORM_ZOOKEEPER_AUTH_SCHEME, value); + } + + private Map emptyMockMap(){ + return new HashMap<String, Object>(); + } + + /* isZkAuthenticationConfiguredTopology */ + @Test + public void isZkAuthenticationConfiguredTopologyReturnsFalseOnNullConfigTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredTopology(null)); + } + + @Test + public void isZkAuthenticationConfiguredTopologyReturnsFalseOnSchemeKeyMissingTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredTopology(emptyMockMap())); + } + + @Test + public void isZkAuthenticationConfiguredTopologyReturnsFalseOnSchemeValueNullTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredTopology(topologyMockMap(null))); + } + + @Test + public void isZkAuthenticationConfiguredTopologyReturnsTrueWhenSchemeSetToStringTest(){ + Assert.assertTrue(Utils.isZkAuthenticationConfiguredTopology(topologyMockMap("foobar"))); + } + + /* isZkAuthenticationConfiguredStormServer */ + @Test + public void isZkAuthenticationConfiguredStormReturnsFalseOnNullConfigTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredStormServer(null)); + } + + @Test + public void isZkAuthenticationConfiguredStormReturnsFalseOnSchemeKeyMissingTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredStormServer(emptyMockMap())); + } + + @Test + public void isZkAuthenticationConfiguredStormReturnsFalseOnSchemeValueNullTest(){ + Assert.assertFalse(Utils.isZkAuthenticationConfiguredStormServer(serverMockMap(null))); + } + + @Test + public void isZkAuthenticationConfiguredStormReturnsTrueWhenSchemeSetToStringTest(){ + Assert.assertTrue(Utils.isZkAuthenticationConfiguredStormServer(serverMockMap("foobar"))); + } + + @Test + public void isZkAuthenticationConfiguredStormReturnsTrueWhenAuthLoginConfigIsSetTest(){ + String key = "java.security.auth.login.config"; + String oldValue = System.getProperty(key); + try { + System.setProperty("java.security.auth.login.config", "anything"); + Assert.assertTrue(Utils.isZkAuthenticationConfiguredStormServer(emptyMockMap())); + } catch (Exception ignore) { + } finally { + // reset property + if (oldValue == null){ + System.clearProperty(key); + } else { + System.setProperty(key, oldValue); + } + } + } + + private CuratorFrameworkFactory.Builder setupBuilder(boolean withExhibitor){ + return setupBuilder(withExhibitor, false /*without Auth*/); + } + + private CuratorFrameworkFactory.Builder setupBuilder(boolean withExhibitor, boolean withAuth){ + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + Map<String, Object> conf = new HashMap<String, Object>(); + if (withExhibitor){ + conf.put(Config.STORM_EXHIBITOR_SERVERS,"foo"); + conf.put(Config.STORM_EXHIBITOR_PORT, 0); + conf.put(Config.STORM_EXHIBITOR_URIPATH, "/exhibitor"); + conf.put(Config.STORM_EXHIBITOR_POLL, 0); + conf.put(Config.STORM_EXHIBITOR_RETRY_INTERVAL, 0); + conf.put(Config.STORM_EXHIBITOR_RETRY_INTERVAL_CEILING, 0); + conf.put(Config.STORM_EXHIBITOR_RETRY_TIMES, 0); + } + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 0); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 0); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 0); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 0); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 0); + String zkStr = new String("zk_connection_string"); + ZookeeperAuthInfo auth = null; + if (withAuth){ + auth = new ZookeeperAuthInfo("scheme", "abc".getBytes()); + } + Utils.testSetupBuilder(builder, zkStr, conf, auth); + return builder; + } + + @Test + public void ifExhibitorServersProvidedBuilderUsesTheExhibitorEnsembleProviderTest(){ + CuratorFrameworkFactory.Builder builder = setupBuilder(true /*with exhibitor*/); + Assert.assertEquals(builder.getEnsembleProvider().getConnectionString(), ""); + Assert.assertEquals(builder.getEnsembleProvider().getClass(), ExhibitorEnsembleProvider.class); + } + + @Test + public void ifExhibitorServersAreEmptyBuilderUsesAFixedEnsembleProviderTest(){ + CuratorFrameworkFactory.Builder builder = setupBuilder(false /*without exhibitor*/); + Assert.assertEquals(builder.getEnsembleProvider().getConnectionString(), "zk_connection_string"); + Assert.assertEquals(builder.getEnsembleProvider().getClass(), FixedEnsembleProvider.class); + } + + @Test + public void ifAuthSchemeAndPayloadAreDefinedBuilderUsesAuthTest(){ + CuratorFrameworkFactory.Builder builder = setupBuilder(false /*without exhibitor*/, true /*with auth*/); + List<AuthInfo> authInfos = builder.getAuthInfos(); + AuthInfo authInfo = authInfos.get(0); + Assert.assertEquals(authInfo.getScheme(), "scheme"); + Assert.assertArrayEquals(authInfo.getAuth(), "abc".getBytes()); + } + + @Test + public void parseJvmHeapMemByChildOpts1024KIs1Test(){ + Assert.assertEquals(Utils.parseJvmHeapMemByChildOpts("Xmx1024K", 0.0).doubleValue(), 1.0, 0); + } + + @Test + public void parseJvmHeapMemByChildOpts100MIs100Test(){ + Assert.assertEquals(Utils.parseJvmHeapMemByChildOpts("Xmx100M", 0.0).doubleValue(), 100.0, 0); + } + + @Test + public void parseJvmHeapMemByChildOpts1GIs1024Test(){ + Assert.assertEquals(Utils.parseJvmHeapMemByChildOpts("Xmx1G", 0.0).doubleValue(), 1024.0, 0); + } + + @Test + public void parseJvmHeapMemByChildOptsReturnsDefaultIfMatchNotFoundTest(){ + Assert.assertEquals(Utils.parseJvmHeapMemByChildOpts("Xmx1T", 123.0).doubleValue(), 123.0, 0); + } + + @Test + public void parseJvmHeapMemByChildOptsReturnsDefaultIfInputIsNullTest(){ + Assert.assertEquals(Utils.parseJvmHeapMemByChildOpts(null, 123.0).doubleValue(), 123.0, 0); + } +}
