Repository: storm
Updated Branches:
  refs/heads/master 96b702dfc -> efb2e9a33


STORM-2947: Remove some deprecated methods in storm-client and storm-server


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4ed52e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4ed52e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4ed52e7

Branch: refs/heads/master
Commit: c4ed52e79732d0026f615f00ae6be4683dbf9c74
Parents: 2d7c7d3
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Tue Jul 10 15:56:34 2018 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Thu Jul 12 16:14:23 2018 +0200

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 -
 .../java/org/apache/storm/loadgen/LoadBolt.java |  7 +-
 .../src/jvm/org/apache/storm/Config.java        | 15 ----
 .../org/apache/storm/daemon/StormCommon.java    |  6 --
 .../apache/storm/messaging/netty/Client.java    |  3 +-
 .../security/auth/ICredentialsRenewer.java      | 16 +---
 .../DefaultSerializationDelegate.java           | 57 ---------------
 .../GzipBridgeSerializationDelegate.java        | 59 ---------------
 .../apache/storm/testing/FixedTupleSpout.java   |  8 --
 .../src/jvm/org/apache/storm/tuple/Tuple.java   |  8 --
 .../jvm/org/apache/storm/tuple/TupleImpl.java   |  5 --
 .../org/apache/storm/utils/NimbusClient.java    |  9 +--
 .../StormBoundedExponentialBackoffRetry.java    | 13 ++--
 .../src/jvm/org/apache/storm/utils/Utils.java   |  5 +-
 .../GzipBridgeSerializationDelegateTest.java    | 77 --------------------
 .../storm/messaging/netty_integration_test.clj  |  1 -
 .../test/clj/org/apache/storm/nimbus_test.clj   | 48 ++++++------
 .../apache/storm/messaging/netty/NettyTest.java |  1 -
 18 files changed, 36 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f544903..da4753a 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -223,7 +223,6 @@ storm.messaging.netty.buffer.high.watermark: 16777216 # 16 
MB
 # dropped down below this value, any blocked clients will unblock and start 
processing further messages.
 storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 
120, other workers should also wait at least that long before giving up on 
connecting to the other worker. The reconnection period need also be bigger 
than storm.zookeeper.session.timeout(default is 20s), so that we can abort the 
reconnection when the target worker is dead.
-storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java 
b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
index 28b611f..7eb2b73 100644
--- 
a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
+++ 
b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -23,11 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.OutputCollector;
@@ -83,7 +78,7 @@ public class LoadBolt extends BaseRichBolt {
     @Override
     public void execute(final Tuple input) {
         long startTimeNs = System.nanoTime();
-        InputStream in = inputStreams.get(input.getSourceGlobalStreamid());
+        InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
         sleep.simulateProcessAndExecTime(executorIndex, startTimeNs, in, () -> 
{
             emitTuples(input);
             collector.ack(input);

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index 8dfcee9..318a130 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1115,13 +1115,6 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
     /**
-     * The host that the master server is running on, added only for backward 
compatibility, the usage deprecated in favor of nimbus.seeds
-     * config.
-     */
-    @Deprecated
-    @isString
-    public static final String NIMBUS_HOST = "nimbus.host";
-    /**
      * List of seed nimbus hosts to use for leader nimbus discovery.
      */
     @isStringList
@@ -1309,14 +1302,6 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = 
"storm.messaging.netty.transfer.batch.size";
     /**
-     * Netty based messaging: The max # of retries that a peer will perform 
when a remote is not accessible
-     *
-     * @deprecated "Since netty clients should never stop reconnecting - this 
does not make sense anymore.
-     */
-    @Deprecated
-    @isInteger
-    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = 
"storm.messaging.netty.max_retries";
-    /**
      * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java 
b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index cd7ef6d..2bb1871 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -24,7 +24,6 @@ import java.util.TreeMap;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.Thrift;
-import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.GlobalStreamId;
@@ -85,11 +84,6 @@ public class StormCommon {
         return oldInstance;
     }
 
-    @Deprecated
-    public static String getStormId(final IStormClusterState 
stormClusterState, final String topologyName) {
-        return stormClusterState.getTopoId(topologyName).get();
-    }
-
     public static void validateDistributedMode(Map<String, Object> conf) {
         if (ConfigUtils.isLocalMode(conf)) {
             throw new IllegalArgumentException("Cannot start server in local 
mode!");

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java 
b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 1ecdd26..d46d785 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -140,10 +140,9 @@ public class Client extends ConnectionWithStatus 
implements IStatefulObject, ISa
         LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, 
lowWatermark: {}, highWatermark: {}",
                  host, port, bufferSize, lowWatermark, highWatermark);
 
-        int maxReconnectionAttempts = 
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = 
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         int maxWaitMs = 
ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, 
maxWaitMs, maxReconnectionAttempts);
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, 
maxWaitMs, -1);
 
         // Initiate connection to remote destination
         this.eventLoopGroup = eventLoopGroup;

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
index abe11ba..09173b7 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
@@ -33,19 +33,5 @@ public interface ICredentialsRenewer {
      * @param topologyConf           topology configuration.
      * @param topologyOwnerPrincipal the full principal name of the owner of 
the topology
      */
-    @SuppressWarnings("deprecation")
-    default void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal) {
-        renew(credentials, topologyConf);
-    }
-
-    /**
-     * Renew any credentials that need to be renewed. (Update the credentials 
if needed)
-     *
-     * @param credentials  the credentials that may have something to renew.
-     * @param topologyConf topology configuration.
-     */
-    @Deprecated
-    default void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf) {
-        throw new IllegalStateException("At least one of the renew methods 
must be overridden");
-    }
+    void renew(Map<String, String> credentials, Map<String, Object> 
topologyConf, String topologyOwnerPrincipal);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
 
b/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
deleted file mode 100644
index c2cacc2..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-@Deprecated
-public class DefaultSerializationDelegate implements SerializationDelegate {
-
-    @Override
-    public void prepare(Map<String, Object> topoConf) {
-        // No-op
-    }
-
-    @Override
-    public byte[] serialize(Object object) {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(bos);
-            oos.writeObject(object);
-            oos.close();
-            return bos.toByteArray();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
-        try {
-            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-            ObjectInputStream ois = new ObjectInputStream(bis);
-            Object ret = ois.readObject();
-            ois.close();
-            return (T) ret;
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
 
b/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
deleted file mode 100644
index 152afd8..0000000
--- 
a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.util.Map;
-import java.util.zip.GZIPInputStream;
-
-/**
- * Always writes gzip out, but tests incoming to see if it's gzipped. If it 
is, deserializes with gzip. If not, uses {@link
- * org.apache.storm.serialization.DefaultSerializationDelegate} to 
deserialize. Any logic needing to be enabled via {@link
- * #prepare(java.util.Map)} is passed through to both delegates.
- */
-@Deprecated
-public class GzipBridgeSerializationDelegate implements SerializationDelegate {
-
-    // Split up GZIP_MAGIC into readable bytes
-    private static final byte GZIP_MAGIC_FIRST_BYTE = (byte) 
GZIPInputStream.GZIP_MAGIC;
-    private static final byte GZIP_MAGIC_SECOND_BYTE = (byte) 
(GZIPInputStream.GZIP_MAGIC >> 8);
-    private DefaultSerializationDelegate defaultDelegate = new 
DefaultSerializationDelegate();
-    private GzipSerializationDelegate gzipDelegate = new 
GzipSerializationDelegate();
-
-    @Override
-    public void prepare(Map<String, Object> topoConf) {
-        defaultDelegate.prepare(topoConf);
-        gzipDelegate.prepare(topoConf);
-    }
-
-    @Override
-    public byte[] serialize(Object object) {
-        return gzipDelegate.serialize(object);
-    }
-
-    @Override
-    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
-        if (isGzipped(bytes)) {
-            return gzipDelegate.deserialize(bytes, clazz);
-        } else {
-            return defaultDelegate.deserialize(bytes, clazz);
-        }
-    }
-
-    /**
-     * Looks ahead to see if the GZIP magic constant is heading {@code bytes}
-     */
-    private boolean isGzipped(byte[] bytes) {
-        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
-               && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java 
b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
index 5de142d..44ff71c 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
@@ -41,14 +41,6 @@ public class FixedTupleSpout implements IRichSpout, 
CompletableSpout {
         this(tuples, (Fields) null);
     }
 
-    /**
-     * @deprecated please use {@link #FixedTupleSpout(List, Fields)}
-     */
-    @Deprecated
-    public FixedTupleSpout(List tuples, String fieldName) {
-        this(tuples, new Fields(fieldName));
-    }
-
     public FixedTupleSpout(List tuples, Fields fields) {
         _id = UUID.randomUUID().toString();
         synchronized (acked) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java 
b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
index 23f531e..ec91d77 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
@@ -29,14 +29,6 @@ public interface Tuple extends ITuple {
 
     /**
      * Returns the global stream id (component + stream) of this tuple.
-     *
-     * @deprecated replaced by {@link #getSourceGlobalStreamId()} due to 
broken naming convention
-     */
-    @Deprecated
-    public GlobalStreamId getSourceGlobalStreamid();
-
-    /**
-     * Returns the global stream id (component + stream) of this tuple.
      */
     public GlobalStreamId getSourceGlobalStreamId();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java 
b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
index 0c1ced2..06bfd48 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
@@ -225,11 +225,6 @@ public class TupleImpl implements Tuple {
     }
 
     @Override
-    public GlobalStreamId getSourceGlobalStreamid() {
-        return getSourceGlobalStreamId();
-    }
-
-    @Override
     public GlobalStreamId getSourceGlobalStreamId() {
         return new GlobalStreamId(getSourceComponent(), streamId);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java 
b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index da8bcf5..47ccbe1 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -136,14 +136,7 @@ public class NimbusClient extends ThriftClient {
             asUser = (String) conf.get(Config.STORM_DO_AS_USER);
         }
 
-        List<String> seeds;
-        if (conf.containsKey(Config.NIMBUS_HOST) && 
StringUtils.isNotBlank(conf.get(Config.NIMBUS_HOST).toString())) {
-            LOG.warn("Using deprecated config {} for backward compatibility. 
Please update your storm.yaml so it only has config {}",
-                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
-            seeds = 
Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString());
-        } else {
-            seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
-        }
+        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
 
         for (String host : seeds) {
             int port = 
Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
 
b/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
index 38f06d4..768c83c 100644
--- 
a/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ 
b/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -21,7 +21,7 @@ public class StormBoundedExponentialBackoffRetry extends 
BoundedExponentialBacko
     private static final Logger LOG = 
LoggerFactory.getLogger(StormBoundedExponentialBackoffRetry.class);
     private final Random random = new Random();
     private final int linearBaseSleepMs;
-    private int stepSize;
+    private final int stepSize;
     private int expRetriesThreshold;
 
     /**
@@ -31,7 +31,6 @@ public class StormBoundedExponentialBackoffRetry extends 
BoundedExponentialBacko
      * Also adds jitter for exponential/linear retry. It guarantees 
`currSleepTimeMs >= prevSleepTimeMs` and `baseSleepTimeMs <=
      * currSleepTimeMs <= maxSleepTimeMs`
      */
-
     public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int 
maxSleepTimeMs, int maxRetries) {
         super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         expRetriesThreshold = 1;
@@ -39,10 +38,10 @@ public class StormBoundedExponentialBackoffRetry extends 
BoundedExponentialBacko
             expRetriesThreshold++;
         }
         LOG.debug("The baseSleepTimeMs [{}] the maxSleepTimeMs [{}] the 
maxRetries [{}]",
-                  baseSleepTimeMs, maxSleepTimeMs, maxRetries);
+            baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         if (baseSleepTimeMs > maxSleepTimeMs) {
-            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + 
baseSleepTimeMs + "] can't be greater than " +
-                     "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
+            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + 
baseSleepTimeMs + "] can't be greater than "
+                + "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
         }
         if (maxRetries > 0 && maxRetries > expRetriesThreshold) {
             this.stepSize = Math.max(1, (maxSleepTimeMs - (1 << 
expRetriesThreshold)) / (maxRetries - expRetriesThreshold));
@@ -62,8 +61,8 @@ public class StormBoundedExponentialBackoffRetry extends 
BoundedExponentialBacko
             return sleepTimeMs;
         } else {
             int stepJitter = random.nextInt(stepSize);
-            long sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), 
(linearBaseSleepMs +
-                                                                    (stepSize 
* (retryCount - expRetriesThreshold)) + stepJitter));
+            long sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), 
(linearBaseSleepMs
+                + (stepSize * (retryCount - expRetriesThreshold)) + 
stepJitter));
             LOG.warn("WILL SLEEP FOR {}ms (MAX)", sleepTimeMs);
             return sleepTimeMs;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 34ab137..ab8307d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -81,7 +81,7 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.serialization.DefaultSerializationDelegate;
+import org.apache.storm.serialization.GzipThriftSerializationDelegate;
 import org.apache.storm.serialization.SerializationDelegate;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
@@ -780,8 +780,7 @@ public class Utils {
             Class delegateClass = Class.forName(delegateClassName);
             delegate = (SerializationDelegate) delegateClass.newInstance();
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
-            LOG.error("Failed to construct serialization delegate, falling 
back to default", e);
-            delegate = new DefaultSerializationDelegate();
+            throw new RuntimeException("Failed to construct serialization 
delegate class " + delegateClassName, e);
         }
         delegate.prepare(topoConf);
         return delegate;

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
 
b/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
deleted file mode 100644
index 5f2dd46..0000000
--- 
a/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.io.Serializable;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class GzipBridgeSerializationDelegateTest {
-
-    SerializationDelegate testDelegate;
-
-    @Before
-    public void setUp() throws Exception {
-        testDelegate = new GzipBridgeSerializationDelegate();
-    }
-
-    @Test
-    public void testDeserialize_readingFromGzip() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "foo";
-        pojo.age = 100;
-
-        byte[] serialized = new GzipSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, 
TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    @Test
-    public void testDeserialize_readingFromGzipBridge() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "bar";
-        pojo.age = 200;
-
-        byte[] serialized = new 
GzipBridgeSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, 
TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    @Test
-    public void testDeserialize_readingFromDefault() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "baz";
-        pojo.age = 300;
-
-        byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, 
TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    static class TestPojo implements Serializable {
-        String name;
-        int age;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj 
b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index 0e29d8c..8e813af 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -31,7 +31,6 @@
                                                     STORM-MESSAGING-TRANSPORT  
"org.apache.storm.messaging.netty.Context"
                                                     
STORM-MESSAGING-NETTY-AUTHENTICATION false
                                                     
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
-                                                    
STORM-MESSAGING-NETTY-MAX-RETRIES 10
                                                     
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
                                                     
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                                                     
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 7d389b3..c8bc42a 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -71,7 +71,7 @@
          nil))
 
 (defn storm-component->task-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         nimbus (.getNimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
         (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus 
storm-id))))
@@ -79,12 +79,12 @@
         clojurify-structure)))
 
 (defn getCredentials [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         creds (.credentials (.getClusterState cluster) storm-id nil)]
     (if creds (into {} (.get_creds creds)))))
 
 (defn storm-component->executor-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         nimbus (.getNimbus cluster)
         storm-conf (from-json (.getTopologyConf nimbus storm-id))
         topology (.getUserTopology nimbus storm-id)
@@ -101,12 +101,12 @@
          clojurify-structure)))
 
 (defn storm-num-workers [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (.size (Utils/reverseMap (.get_executor_node_port assignment)))))
 
 (defn topology-nodes [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -116,7 +116,7 @@
          )))
 
 (defn topology-slots [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -127,7 +127,7 @@
 ;TODO: when translating this function, don't call map-val, but instead use an 
inline for loop.
 ; map-val is a temporary kluge for clojure.
 (defn topology-node-distribution [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -206,7 +206,7 @@
 
 (defnk check-consistency [cluster storm-name :assigned? true]
   (let [state (.getClusterState cluster)
-        storm-id (StormCommon/getStormId state storm-name)
+        storm-id (.get (.getTopoId state storm-name))
         task-ids (task-ids cluster storm-id)
         assignment (.assignmentInfo state storm-id nil)
         executor->node+port (.get_executor_node_port assignment)
@@ -548,7 +548,7 @@
              (log-message "Checking user " (System/getProperty "user.name") " 
" hist-topo-ids)
              (is (= 0 (count hist-topo-ids))))
         (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, 
LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
-        (bind storm-id (StormCommon/getStormId state "test"))
+        (bind storm-id (.get (.getTopoId state "test")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id nil)))
         (is (not-nil? (.assignmentInfo state storm-id nil)))
@@ -559,7 +559,7 @@
         (.advanceClusterTime cluster 35)
         ;; kill topology read on group
         (.submitTopology cluster "killgrouptest" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS ["alice-group"]} topology)
-        (bind storm-id-killgroup (StormCommon/getStormId state 
"killgrouptest"))
+        (bind storm-id-killgroup (.get (.getTopoId state "killgrouptest")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id-killgroup nil)))
         (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
@@ -570,7 +570,7 @@
         (.advanceClusterTime cluster 35)
         ;; kill topology can't read
         (.submitTopology cluster "killnoreadtest" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
-        (bind storm-id-killnoread (StormCommon/getStormId state 
"killnoreadtest"))
+        (bind storm-id-killnoread (.get (.getTopoId state "killnoreadtest")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id-killnoread nil)))
         (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
@@ -583,19 +583,19 @@
         ;; active topology can read
         (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, 
LOGS-USERS ["alice", (System/getProperty "user.name")]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id2 (StormCommon/getStormId state "2test"))
+        (bind storm-id2 (.get (.getTopoId state "2test")))
         (is (not-nil? (.stormBase state storm-id2 nil)))
         (is (not-nil? (.assignmentInfo state storm-id2 nil)))
         ;; active topology can not read
         (.submitTopology cluster "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 
10, LOGS-USERS ["alice"]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id3 (StormCommon/getStormId state "testnoread"))
+        (bind storm-id3 (.get (.getTopoId state "testnoread")))
         (is (not-nil? (.stormBase state storm-id3 nil)))
         (is (not-nil? (.assignmentInfo state storm-id3 nil)))
         ;; active topology can read based on group
         (.submitTopology cluster "testreadgroup" 
{TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS ["alice-group"]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id4 (StormCommon/getStormId state "testreadgroup"))
+        (bind storm-id4 (.get (.getTopoId state "testreadgroup")))
         (is (not-nil? (.stormBase state storm-id4 nil)))
         (is (not-nil? (.assignmentInfo state storm-id4 nil)))
         ;; at this point have 1 running, 1 killed topo
@@ -648,7 +648,7 @@
                        {}))
       (bind state (.getClusterState cluster))
       (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} 
topology)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (.advanceClusterTime cluster 15)
       (is (not-nil? (.stormBase state storm-id nil)))
       (is (not-nil? (.assignmentInfo state storm-id nil)))
@@ -673,7 +673,7 @@
       (.advanceClusterTime cluster 11)
       (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} 
topology)))
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "2test"))
+      (bind storm-id (.get (.getTopoId state "2test")))
       (is (not-nil? (.stormBase state storm-id nil)))
       (.killTopology (.getNimbus cluster) "2test")
       (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} 
topology)))
@@ -687,7 +687,7 @@
       (is (= 0 (count (.heartbeatStorms state))))
 
       (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} 
topology)
-      (bind storm-id3 (StormCommon/getStormId state "test3"))
+      (bind storm-id3 (.get (.getTopoId state "test3")))
       (.advanceClusterTime cluster 11)
       ;; this guarantees an immediate kill notification
       (.killTopology (.getNimbus cluster) "test3")
@@ -702,7 +702,7 @@
       (.waitForIdle cluster)
 
       (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} 
topology)
-      (bind storm-id3 (StormCommon/getStormId state "test3"))
+      (bind storm-id3 (.get (.getTopoId state "test3")))
 
       (.advanceClusterTime cluster 11)
       (bind executor-id (first (topology-executors cluster storm-id3)))
@@ -719,7 +719,7 @@
       (.submitTopology cluster "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} 
topology)
       (.advanceClusterTime cluster 11)
       (.killTopologyWithOpts (.getNimbus cluster) "test4" (doto (KillOptions.) 
(.set_wait_secs 10)))
-      (bind storm-id4 (StormCommon/getStormId state "test4"))
+      (bind storm-id4 (.get (.getTopoId state "test4")))
       (.advanceClusterTime cluster 9)
       (is (not-nil? (.assignmentInfo state storm-id4 nil)))
       (.advanceClusterTime cluster 2)
@@ -748,7 +748,7 @@
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
       (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -872,7 +872,7 @@
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
       (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -931,7 +931,7 @@
       (bind state (.getClusterState cluster))
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 4} topology)  ; 
distribution should be 2, 2, 2, 3 ideally
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind slot-executors (slot-assignments cluster storm-id))
       (check-executor-distribution slot-executors [9])
       (check-consistency cluster "test")
@@ -1065,7 +1065,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (.addSupervisor cluster 3)
       (.addSupervisor cluster 3)
 
@@ -1115,7 +1115,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind checker (fn [distribution]
                       (check-executor-distribution
                         (slot-assignments cluster storm-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java 
b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 4b84e55..477bf05 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -137,7 +137,6 @@ public class NettyTest {
         stormConf.put(Config.STORM_MESSAGING_TRANSPORT, 
"org.apache.storm.messaging.netty.Context");
         stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
         stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024);
-        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
         stormConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
         stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
         stormConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);

Reply via email to