Fixing stylecheck problems with storm-cassandra

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a2d131f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a2d131f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a2d131f

Branch: refs/heads/master
Commit: 1a2d131f99e62823bf15fa958cc676a51efda10c
Parents: e3f5b13
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 22:44:56 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 00:22:36 2018 -0400

----------------------------------------------------------------------
 external/storm-cassandra/pom.xml                |   2 +-
 .../AbstractExecutionResultHandler.java         |  34 ++--
 .../cassandra/BaseExecutionResultHandler.java   |  32 ++--
 .../storm/cassandra/CassandraContext.java       |  28 ++-
 .../cassandra/DynamicStatementBuilder.java      |  38 ++--
 .../storm/cassandra/ExecutionResultHandler.java |  27 ++-
 .../storm/cassandra/Murmur3StreamGrouping.java  |  87 ++++-----
 .../storm/cassandra/bolt/BaseCassandraBolt.java |  42 ++--
 .../bolt/BatchCassandraWriterBolt.java          |  80 ++++----
 .../cassandra/bolt/CassandraWriterBolt.java     |  34 ++--
 .../cassandra/bolt/GroupingBatchBuilder.java    |  29 ++-
 .../bolt/PairBatchStatementTuples.java          |  23 +--
 .../cassandra/bolt/PairStatementTuple.java      |  26 +--
 .../storm/cassandra/client/CassandraConf.java   | 191 +++++++++----------
 .../storm/cassandra/client/ClusterFactory.java  |  46 ++---
 .../storm/cassandra/client/SimpleClient.java    |  20 +-
 .../cassandra/client/SimpleClientProvider.java  |  20 +-
 .../cassandra/client/impl/DefaultClient.java    |  27 +--
 .../cassandra/context/BaseBeanFactory.java      |  25 ++-
 .../storm/cassandra/context/BeanFactory.java    |  20 +-
 .../storm/cassandra/context/WorkerCtx.java      |  32 ++--
 .../storm/cassandra/executor/AsyncExecutor.java |  98 +++++-----
 .../executor/AsyncExecutorProvider.java         |  22 +--
 .../cassandra/executor/AsyncResultHandler.java  |  27 +--
 .../executor/AsyncResultSetHandler.java         |  23 +--
 .../executor/ExecutionResultCollector.java      |  36 ++--
 .../executor/impl/BatchAsyncResultHandler.java  |  32 ++--
 .../executor/impl/SingleAsyncResultHandler.java |  30 ++-
 .../query/AyncCQLResultSetValuesMapper.java     |  25 +--
 .../query/BaseCQLStatementTupleMapper.java      |  23 +--
 .../query/CQLResultSetValuesMapper.java         |  25 +--
 .../cassandra/query/CQLStatementBuilder.java    |  20 +-
 .../query/CQLStatementTupleMapper.java          |  25 +--
 .../apache/storm/cassandra/query/Column.java    |  34 ++--
 .../storm/cassandra/query/ContextQuery.java     |  23 +--
 .../apache/storm/cassandra/query/CqlMapper.java |  29 ++-
 .../cassandra/query/ObjectMapperOperation.java  |  19 +-
 .../builder/BoundCQLStatementMapperBuilder.java |  33 ++--
 .../ObjectMapperCqlStatementMapperBuilder.java  |  19 +-
 .../SimpleCQLStatementMapperBuilder.java        |  31 ++-
 .../impl/BatchCQLStatementTupleMapper.java      |  29 ++-
 .../impl/BoundCQLStatementTupleMapper.java      |  48 ++---
 .../impl/ObjectMapperCqlStatementMapper.java    |  27 ++-
 .../query/impl/PreparedStatementBinder.java     |  40 ++--
 .../query/impl/RoutingKeyGenerator.java         |  25 +--
 .../query/impl/SimpleCQLStatementMapper.java    |  36 ++--
 .../cassandra/query/selector/FieldSelector.java |  25 +--
 .../trident/state/CassandraBackingMap.java      |  49 ++---
 .../trident/state/CassandraMapStateFactory.java |  29 ++-
 .../cassandra/trident/state/CassandraQuery.java |  25 +--
 .../cassandra/trident/state/CassandraState.java |  91 ++++-----
 .../trident/state/CassandraStateFactory.java    |  28 ++-
 .../trident/state/CassandraStateUpdater.java    |  23 +--
 .../trident/state/MapStateFactoryBuilder.java   |  85 ++++-----
 .../state/NonTransactionalTupleStateMapper.java |  23 +--
 .../trident/state/OpaqueTupleStateMapper.java   |  31 ++-
 .../trident/state/SerializedStateMapper.java    |  31 ++-
 .../trident/state/SimpleStateMapper.java        |  26 +--
 .../cassandra/trident/state/SimpleTuple.java    |  38 ++--
 .../cassandra/trident/state/StateMapper.java    |  25 +--
 .../state/TransactionalTupleStateMapper.java    |  28 +--
 .../TridentAyncCQLResultSetValuesMapper.java    |  28 +--
 .../state/TridentResultSetValuesMapper.java     |  29 ++-
 .../apache/storm/cassandra/WeatherSpout.java    |  25 +--
 .../testtools/EmbeddedCassandraResource.java    |  33 ++--
 .../storm/cassandra/trident/MapStateTest.java   | 103 +++++-----
 .../cassandra/trident/WeatherBatchSpout.java    |  43 ++---
 67 files changed, 1011 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index d98824c..87471c8 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -132,7 +132,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>578</maxAllowedViolations>
+                    <maxAllowedViolations>159</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
index 6510565..3b32749 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java
@@ -1,35 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.exceptions.QueryValidationException;
 import com.datastax.driver.core.exceptions.ReadTimeoutException;
 import com.datastax.driver.core.exceptions.UnavailableException;
 import com.datastax.driver.core.exceptions.WriteTimeoutException;
+import java.util.List;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
  * Default interface to define strategies to apply when a query is either 
succeed or failed.
  *
@@ -40,10 +32,10 @@ public abstract class AbstractExecutionResultHandler 
implements ExecutionResultH
 
     @Override
     public void onThrowable(Throwable t, OutputCollector collector, Tuple i) {
-        if( t instanceof QueryValidationException) {
-            this.onQueryValidationException((QueryValidationException)t, 
collector, i);
+        if (t instanceof QueryValidationException) {
+            this.onQueryValidationException((QueryValidationException) t, 
collector, i);
         } else if (t instanceof ReadTimeoutException) {
-            this.onReadTimeoutException((ReadTimeoutException)t, collector, i);
+            this.onReadTimeoutException((ReadTimeoutException) t, collector, 
i);
         } else if (t instanceof WriteTimeoutException) {
             this.onWriteTimeoutException((WriteTimeoutException) t, collector, 
i);
         } else if (t instanceof UnavailableException) {
@@ -56,6 +48,6 @@ public abstract class AbstractExecutionResultHandler 
implements ExecutionResultH
 
     @Override
     public void onThrowable(Throwable t, OutputCollector collector, 
List<Tuple> tl) {
-        for(Tuple i : tl) onThrowable(t, collector, i);
+        for (Tuple i : tl) onThrowable(t, collector, i);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
index 59b6343..57ef351 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java
@@ -1,31 +1,29 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import com.datastax.driver.core.exceptions.ReadTimeoutException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
-import com.datastax.driver.core.exceptions.*;
 import org.slf4j.LoggerFactory;
 
 /**
- * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an 
{@link com.datastax.driver.core.exceptions.DriverException} is thrown.
+ * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an
+ * {@link com.datastax.driver.core.exceptions.DriverException} is thrown.
  * The exception is then automatically report to storm.
  *
  */
@@ -40,6 +38,7 @@ public class BaseExecutionResultHandler extends 
AbstractExecutionResultHandler {
     public void onQueryValidationException(QueryValidationException e, 
OutputCollector collector, Tuple tuple) {
         onDriverException(e, collector, tuple);
     }
+
     /**
      * {@inheritDoc}
      */
@@ -47,6 +46,7 @@ public class BaseExecutionResultHandler extends 
AbstractExecutionResultHandler {
     public void onReadTimeoutException(ReadTimeoutException e, OutputCollector 
collector, Tuple tuple) {
         onDriverException(e, collector, tuple);
     }
+
     /**
      * {@inheritDoc}
      */
@@ -54,6 +54,7 @@ public class BaseExecutionResultHandler extends 
AbstractExecutionResultHandler {
     public void onWriteTimeoutException(WriteTimeoutException e, 
OutputCollector collector, Tuple tuple) {
         onDriverException(e, collector, tuple);
     }
+
     /**
      * {@inheritDoc}
      */
@@ -61,6 +62,7 @@ public class BaseExecutionResultHandler extends 
AbstractExecutionResultHandler {
     public void onUnavailableException(UnavailableException e, OutputCollector 
collector, Tuple tuple) {
         onDriverException(e, collector, tuple);
     }
+
     /**
      * {@inheritDoc}
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
index e862755..49763d2 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
 import com.datastax.driver.core.Cluster;
+import java.util.Map;
 import org.apache.storm.cassandra.client.CassandraConf;
 import org.apache.storm.cassandra.client.ClusterFactory;
 import org.apache.storm.cassandra.client.SimpleClient;
@@ -30,8 +24,6 @@ import org.apache.storm.cassandra.context.WorkerCtx;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  *
  */
@@ -52,8 +44,9 @@ public class CassandraContext extends WorkerCtx implements 
SimpleClientProvider
     @Override
     public SimpleClient getClient(Map<String, Object> config) {
         SimpleClient client = getWorkerBean(SimpleClient.class, config);
-        if (client.isClose() )
+        if (client.isClose()) {
             client = getWorkerBean(SimpleClient.class, config, true);
+        }
         return client;
     }
 
@@ -76,13 +69,14 @@ public class CassandraContext extends WorkerCtx implements 
SimpleClientProvider
     public static final class ClientFactory extends 
BaseBeanFactory<SimpleClient> {
 
         private static final Logger LOG = 
LoggerFactory.getLogger(ClientFactory.class);
+
         /**
          * {@inheritDoc}
          */
         @Override
         protected SimpleClient make(Map<String, Object> topoConf) {
             Cluster cluster = this.context.getWorkerBean(Cluster.class, 
topoConf);
-            if( cluster.isClosed() ) {
+            if (cluster.isClosed()) {
                 LOG.warn("Cluster is closed - trigger new initialization!");
                 cluster = this.context.getWorkerBean(Cluster.class, topoConf, 
true);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
index 2bd7107..823a566 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java
@@ -1,40 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.querybuilder.BuiltStatement;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.storm.cassandra.query.CQLStatementBuilder;
 import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
 import org.apache.storm.cassandra.query.ContextQuery;
 import org.apache.storm.cassandra.query.CqlMapper;
-import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
 import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder;
 import 
org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder;
+import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper;
 import org.apache.storm.cassandra.query.selector.FieldSelector;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 public class DynamicStatementBuilder implements Serializable {
 
     private DynamicStatementBuilder() {
@@ -84,12 +76,14 @@ public class DynamicStatementBuilder implements 
Serializable {
     public static final BatchCQLStatementTupleMapper 
loggedBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders);
     }
+
     /**
      * Creates a new {@link 
com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the 
specified CQL statement builders.
      */
     public static final BatchCQLStatementTupleMapper 
counterBatch(CQLStatementBuilder... builders) {
         return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders);
     }
+
     /**
      * Creates a new {@link 
com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the 
specified CQL statement builders.
      */
@@ -99,7 +93,7 @@ public class DynamicStatementBuilder implements Serializable {
 
     private static BatchCQLStatementTupleMapper 
newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] 
builders) {
         List<CQLStatementTupleMapper> mappers = new 
ArrayList<>(builders.length);
-        for(CQLStatementBuilder b : Arrays.asList(builders))
+        for (CQLStatementBuilder b : Arrays.asList(builders))
             mappers.add(b.build());
         return new BatchCQLStatementTupleMapper(type, mappers);
     }
@@ -142,8 +136,8 @@ public class DynamicStatementBuilder implements 
Serializable {
     public static final FieldSelector[] fields(final String... fields) {
         int size = fields.length;
         List<FieldSelector> fl = new ArrayList<>(size);
-        for(int i = 0 ; i < size; i++)
-                fl.add(new FieldSelector(fields[i]));
+        for (int i = 0; i < size; i++)
+            fl.add(new FieldSelector(fields[i]));
         return fl.toArray(new FieldSelector[size]);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
index 6113efb..0e2a4db 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java
@@ -1,33 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.exceptions.QueryValidationException;
 import com.datastax.driver.core.exceptions.ReadTimeoutException;
 import com.datastax.driver.core.exceptions.UnavailableException;
 import com.datastax.driver.core.exceptions.WriteTimeoutException;
-
 import java.io.Serializable;
 import java.util.List;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * Default interface to define strategies to apply when a query is either 
succeed or failed.
@@ -43,6 +35,7 @@ public interface ExecutionResultHandler extends Serializable {
      * @param tuple an input tuple.
      */
     void onQueryValidationException(QueryValidationException e, 
OutputCollector collector, Tuple tuple);
+
     /**
      * Invoked when a {@link 
com.datastax.driver.core.exceptions.ReadTimeoutException} is thrown.
      *
@@ -60,6 +53,7 @@ public interface ExecutionResultHandler extends Serializable {
      * @param tuple an input tuple.
      */
     void onWriteTimeoutException(WriteTimeoutException e, OutputCollector 
collector, Tuple tuple);
+
     /**
      * Invoked when a {@link 
com.datastax.driver.core.exceptions.UnavailableException} is thrown.
      *
@@ -69,6 +63,7 @@ public interface ExecutionResultHandler extends Serializable {
      * @param tuple an input tuple.
      */
     void onUnavailableException(UnavailableException e, OutputCollector 
collector, Tuple tuple);
+
     /**
      * Invoked when a query is executed with success.
      * This method is NOT responsible for acknowledging input tuple.

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
index 774955a..c172cb6 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java
@@ -1,39 +1,31 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.cassandra;
 
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.task.WorkerTopologyContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.tuple.Fields;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
-
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.tuple.Fields;
 
 /**
  *
@@ -57,8 +49,8 @@ public class Murmur3StreamGrouping implements 
CustomStreamGrouping {
      * Creates a new {@link Murmur3StreamGrouping} instance.
      * @param partitionKeyNames {@link 
org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}.
      */
-    public Murmur3StreamGrouping(String...partitionKeyNames) {
-        this( Arrays.asList(partitionKeyNames));
+    public Murmur3StreamGrouping(String... partitionKeyNames) {
+        this(Arrays.asList(partitionKeyNames));
     }
 
     /**
@@ -69,6 +61,30 @@ public class Murmur3StreamGrouping implements 
CustomStreamGrouping {
         this.partitionKeyNames = partitionKeyNames;
     }
 
+    /**
+     * Computes the murmur3 hash for the specified values.
+     * 
http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys
+     * 
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+     *
+     * @param values the fields which are part of the (compose) partition key.
+     * @return the computed hash for input values.
+     * @throws java.io.IOException
+     */
+    @VisibleForTesting
+    public static long hashes(List<Object> values) throws IOException {
+        byte[] keyBytes;
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
DataOutputStream out = new DataOutputStream(bos)) {
+            for (Object key : values) {
+                byte[] arr = ((String) key).getBytes("UTF-8");
+                out.writeShort(arr.length);
+                out.write(arr, 0, arr.length);
+                out.writeByte(0);
+            }
+            out.flush();
+            keyBytes = bos.toByteArray();
+        }
+        return Hashing.murmur3_128().hashBytes(keyBytes).asLong();
+    }
 
     /**
      * {@inheritDoc}
@@ -90,7 +106,7 @@ public class Murmur3StreamGrouping implements 
CustomStreamGrouping {
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         try {
-            int n = Math.abs( (int) hashes(getKeyValues(values)) % 
targetTasks.size() );
+            int n = Math.abs((int) hashes(getKeyValues(values)) % 
targetTasks.size());
             return Lists.newArrayList(targetTasks.get(n));
         } catch (IOException e) {
             throw new FailedException(e);
@@ -99,34 +115,9 @@ public class Murmur3StreamGrouping implements 
CustomStreamGrouping {
 
     private List<Object> getKeyValues(List<Object> values) {
         List<Object> keys = new ArrayList<>();
-        for(Integer idx : partitionKeyIndexes) {
+        for (Integer idx : partitionKeyIndexes) {
             keys.add(values.get(idx));
         }
         return keys;
     }
-
-    /**
-     * Computes the murmur3 hash for the specified values.
-     * 
http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys
-     * 
https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java
-     *
-     * @param values the fields which are part of the (compose) partition key.
-     * @return the computed hash for input values.
-     * @throws java.io.IOException
-     */
-    @VisibleForTesting
-    public static long hashes(List<Object> values) throws IOException {
-        byte[] keyBytes;
-        try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
DataOutputStream out = new DataOutputStream(bos)) {
-            for(Object key : values) {
-                byte[] arr = ((String)key).getBytes("UTF-8");
-                out.writeShort(arr.length);
-                out.write(arr, 0, arr.length);
-                out.writeByte(0);
-            }
-            out.flush();
-            keyBytes = bos.toByteArray();
-        }
-        return Hashing.murmur3_128().hashBytes(keyBytes).asLong();
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
index f02fa26..946932f 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java
@@ -1,26 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.cassandra.BaseExecutionResultHandler;
 import org.apache.storm.cassandra.CassandraContext;
@@ -42,9 +38,6 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * A base cassandra bolt.
  *
@@ -55,7 +48,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(BaseCassandraBolt.class);
 
     protected OutputCollector outputCollector;
-    
+
     protected SimpleClientProvider clientProvider;
     protected SimpleClient client;
     protected Session session;
@@ -66,7 +59,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
     private CQLStatementTupleMapper mapper;
     private ExecutionResultHandler resultHandler;
 
-    transient private  Map<String, Fields> outputsFields = new HashMap<>();
+    transient private Map<String, Fields> outputsFields = new HashMap<>();
     private Map<String, Object> cassandraConfig;
 
     /**
@@ -77,6 +70,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
         this.mapper = mapper;
         this.clientProvider = clientProvider;
     }
+
     /**
      * Creates a new {@link CassandraWriterBolt} instance.
      * @param tupleMapper
@@ -129,7 +123,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
      * @param fields
      */
     public BaseCassandraBolt withStreamOutputFields(String stream, Fields 
fields) {
-        if( stream == null || stream.length() == 0) throw new 
IllegalArgumentException("'stream' should not be null");
+        if (stream == null || stream.length() == 0) throw new 
IllegalArgumentException("'stream' should not be null");
         this.outputsFields.put(stream, fields);
         return this;
     }
@@ -141,7 +135,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
      * @param config
      */
     public BaseCassandraBolt withCassandraConfig(Map<String, Object> config) {
-        if(config == null) {
+        if (config == null) {
             throw new IllegalArgumentException("config should not be null");
         }
         cassandraConfig = config;
@@ -149,7 +143,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
     }
 
     protected ExecutionResultHandler getResultHandler() {
-        if(resultHandler == null) resultHandler = new 
BaseExecutionResultHandler();
+        if (resultHandler == null) resultHandler = new 
BaseExecutionResultHandler();
         return resultHandler;
     }
 
@@ -157,7 +151,7 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
         return mapper;
     }
 
-    abstract protected AsyncResultHandler<T> getAsyncHandler() ;
+    abstract protected AsyncResultHandler<T> getAsyncHandler();
 
     protected AsyncExecutor<T> getAsyncExecutor() {
         return AsyncExecutorProvider.getLocal(session, getAsyncHandler());
@@ -182,8 +176,8 @@ public abstract class BaseCassandraBolt<T> extends 
BaseTickTupleAwareRichBolt {
         // outputsFields can be empty if this bolt acts like a sink in 
topology.
         if (!outputsFields.isEmpty()) {
             Fields fields = outputsFields.remove(Utils.DEFAULT_STREAM_ID);
-            if( fields != null) declarer.declare(fields);
-            for(Map.Entry<String, Fields> entry : outputsFields.entrySet()) {
+            if (fields != null) declarer.declare(fields);
+            for (Map.Entry<String, Fields> entry : outputsFields.entrySet()) {
                 declarer.declareStream(entry.getKey(), entry.getValue());
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
index 292f36d..d87ecab 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java
@@ -1,51 +1,42 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
+import com.datastax.driver.core.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.Config;
+import org.apache.storm.cassandra.executor.AsyncResultHandler;
+import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
+import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Time;
-import com.datastax.driver.core.Statement;
-import org.apache.storm.cassandra.executor.AsyncResultHandler;
-import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
-import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
 public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
 
-    private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
-
     public static final int DEFAULT_EMIT_FREQUENCY = 2;
-
+    private final static Logger LOG = 
LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
     private LinkedBlockingQueue<Tuple> queue;
-    
+
     private int tickFrequencyInSeconds;
-    
+
     private long lastModifiedTimesMillis;
 
     private int batchMaxSize = 1000;
@@ -72,6 +63,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
         super(tupleMapper);
         this.tickFrequencyInSeconds = tickFrequencyInSeconds;
     }
+
     /**
      * {@inheritDoc}
      */
@@ -85,7 +77,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
 
     @Override
     protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
-        if( asyncResultHandler == null) {
+        if (asyncResultHandler == null) {
             asyncResultHandler = new 
BatchAsyncResultHandler(getResultHandler());
         }
         return asyncResultHandler;
@@ -96,12 +88,13 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
      */
     @Override
     protected void process(Tuple input) {
-        if( ! queue.offer(input) ) {
+        if (!queue.offer(input)) {
             LOG.info(logPrefix() + "The message queue is full, preparing batch 
statement...");
             prepareAndExecuteStatement();
             queue.add(input);
         }
     }
+
     /**
      * {@inheritDoc}
      */
@@ -112,7 +105,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
 
     public void prepareAndExecuteStatement() {
         int size = queue.size();
-        if( size > 0 ) {
+        if (size > 0) {
             List<Tuple> inputs = new ArrayList<>(size);
             queue.drainTo(inputs);
             try {
@@ -127,16 +120,18 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
 
                 int batchSize = 0;
                 for (PairBatchStatementTuples batch : batchBuilder) {
-                    LOG.debug(logPrefix() + "Writing data to {} in batches of 
{} rows.", cassandraConf.getKeyspace(), batch.getInputs().size());
+                    LOG.debug(logPrefix() + "Writing data to {} in batches of 
{} rows.", cassandraConf.getKeyspace(),
+                              batch.getInputs().size());
                     getAsyncExecutor().execAsync(batch.getStatement(), 
batch.getInputs());
                     batchSize++;
                 }
 
                 int pending = getAsyncExecutor().getPendingTasksSize();
                 if (pending > batchSize) {
-                    LOG.warn( logPrefix() + "Currently pending tasks is 
superior to the number of submit batches({}) : {}", batchSize, pending);
+                    LOG.warn(logPrefix() + "Currently pending tasks is 
superior to the number of submit batches({}) : {}", batchSize,
+                             pending);
                 }
-                
+
             } catch (Throwable r) {
                 LOG.error(logPrefix() + "Error(s) occurred while preparing 
batch statements");
                 getAsyncHandler().failure(r, inputs);
@@ -147,17 +142,19 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
     private List<PairStatementTuple> buildStatement(List<Tuple> inputs) {
         List<PairStatementTuple> stmts = new ArrayList<>(inputs.size());
 
-        for(Tuple t : inputs) {
+        for (Tuple t : inputs) {
             List<Statement> sl = getMapper().map(topoConfig, session, t);
-            for(Statement s : sl)
-                stmts.add(new PairStatementTuple(t, s) );
+            for (Statement s : sl)
+                stmts.add(new PairStatementTuple(t, s));
         }
         return stmts;
     }
 
     private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
-        if(sinceLastModified > tickFrequencyInSeconds)
-            LOG.warn( logPrefix() + "The time elapsed since last execution 
exceeded tick tuple frequency - {} > {} seconds", sinceLastModified, 
tickFrequencyInSeconds);
+        if (sinceLastModified > tickFrequencyInSeconds) {
+            LOG.warn(logPrefix() + "The time elapsed since last execution 
exceeded tick tuple frequency - {} > {} seconds",
+                     sinceLastModified, tickFrequencyInSeconds);
+        }
     }
 
     private String logPrefix() {
@@ -165,7 +162,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
     }
 
     public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit 
unit) {
-        this.tickFrequencyInSeconds = (int)unit.toSeconds(time);
+        this.tickFrequencyInSeconds = (int) unit.toSeconds(time);
         return this;
     }
 
@@ -178,6 +175,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
         this.batchMaxSize = size;
         return this;
     }
+
     /**
      * {@inheritDoc}
      */
@@ -187,7 +185,7 @@ public class BatchCassandraWriterBolt extends 
BaseCassandraBolt<List<Tuple>> {
         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
         return conf;
     }
-    
+
     private int updateAndGetSecondsSinceLastModified() {
         long now = now();
         int seconds = (int) (now - lastModifiedTimesMillis) / 1000;

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
index 876f6f5..fc45b3f 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java
@@ -1,30 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.Statement;
+import java.util.List;
 import org.apache.storm.cassandra.executor.AsyncResultHandler;
 import org.apache.storm.cassandra.executor.impl.SingleAsyncResultHandler;
 import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
-
-import java.util.List;
+import org.apache.storm.tuple.Tuple;
 
 public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> {
 
@@ -44,7 +37,7 @@ public class CassandraWriterBolt extends 
BaseCassandraBolt<Tuple> {
      */
     @Override
     protected AsyncResultHandler<Tuple> getAsyncHandler() {
-        if( asyncResultHandler == null) {
+        if (asyncResultHandler == null) {
             asyncResultHandler = new 
SingleAsyncResultHandler(getResultHandler());
         }
         return asyncResultHandler;
@@ -56,8 +49,11 @@ public class CassandraWriterBolt extends 
BaseCassandraBolt<Tuple> {
     @Override
     protected void process(Tuple input) {
         List<Statement> statements = getMapper().map(topoConfig, session, 
input);
-        if (statements.size() == 1) 
getAsyncExecutor().execAsync(statements.get(0), input);
-        else getAsyncExecutor().execAsync(statements, input);
+        if (statements.size() == 1) {
+            getAsyncExecutor().execAsync(statements.get(0), input);
+        } else {
+            getAsyncExecutor().execAsync(statements, input);
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
index fdafd50..3697f81 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java
@@ -1,39 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.BatchStatement;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.storm.tuple.Tuple;
 
 /**
- * 
+ *
  */
 public class GroupingBatchBuilder implements 
Iterable<PairBatchStatementTuples> {
 
     private int batchSizeRows;
-    
+
     private List<PairStatementTuple> statements;
 
     /**
@@ -50,7 +43,7 @@ public class GroupingBatchBuilder implements 
Iterable<PairBatchStatementTuples>
         return build().iterator();
     }
 
-    private Iterable<PairBatchStatementTuples> build( ) {
+    private Iterable<PairBatchStatementTuples> build() {
         Iterable<List<PairStatementTuple>> partition = 
Iterables.partition(statements, batchSizeRows);
         return Iterables.transform(partition, new 
Function<List<PairStatementTuple>, PairBatchStatementTuples>() {
             @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
index cef422e..e6eef06 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java
@@ -1,27 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.BatchStatement;
-
 import java.util.List;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * Simple class to pair a list of tuples with a single batch statement.

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
index 0f501a3..d95e9d3 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java
@@ -1,33 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.bolt;
 
-import org.apache.storm.tuple.Tuple;
 import com.datastax.driver.core.Statement;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * Simple class to pair a tuple with a statement.
  */
 public class PairStatementTuple {
-    
+
     private final Tuple tuple;
-    
+
     private final Statement statement;
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
index 19b7551..82f017f 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java
@@ -1,68 +1,61 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.client;
 
+import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.SocketOptions;
 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.Builder;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.datastax.driver.core.policies.LoggingRetryPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
-import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.policies.DefaultRetryPolicy;
 import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
 import com.datastax.driver.core.policies.FallthroughRetryPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
 import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
 import com.google.common.base.Objects;
-
 import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 
 /**
  * Configuration used by cassandra storm components.
  */
 public class CassandraConf implements Serializable {
-    
-    public static final String CASSANDRA_USERNAME           = 
"cassandra.username";
-    public static final String CASSANDRA_PASSWORD           = 
"cassandra.password";
-    public static final String CASSANDRA_KEYSPACE           = 
"cassandra.keyspace";
-    public static final String CASSANDRA_CONSISTENCY_LEVEL  = 
"cassandra.output.consistencyLevel";
-    public static final String CASSANDRA_NODES              = 
"cassandra.nodes";
-    public static final String CASSANDRA_PORT               = "cassandra.port";
-    public static final String CASSANDRA_BATCH_SIZE_ROWS    = 
"cassandra.batch.size.rows";
-    public static final String CASSANDRA_RETRY_POLICY       = 
"cassandra.retryPolicy";
-    public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS  = 
"cassandra.reconnectionPolicy.baseDelayMs";
-    public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS   = 
"cassandra.reconnectionPolicy.maxDelayMs";
-    public static final String CASSANDRA_POOL_MAX_SIZE               = 
"cassandra.pool.max.size";
-    public static final String CASSANDRA_LOAD_BALANCING_POLICY       = 
"cassandra.loadBalancingPolicy";
-    public static final String CASSANDRA_DATACENTER_NAME             = 
"cassandra.datacenter.name";
-    public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL  = 
"cassandra.max.requests.per.con.local";
+
+    public static final String CASSANDRA_USERNAME = "cassandra.username";
+    public static final String CASSANDRA_PASSWORD = "cassandra.password";
+    public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace";
+    public static final String CASSANDRA_CONSISTENCY_LEVEL = 
"cassandra.output.consistencyLevel";
+    public static final String CASSANDRA_NODES = "cassandra.nodes";
+    public static final String CASSANDRA_PORT = "cassandra.port";
+    public static final String CASSANDRA_BATCH_SIZE_ROWS = 
"cassandra.batch.size.rows";
+    public static final String CASSANDRA_RETRY_POLICY = 
"cassandra.retryPolicy";
+    public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = 
"cassandra.reconnectionPolicy.baseDelayMs";
+    public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = 
"cassandra.reconnectionPolicy.maxDelayMs";
+    public static final String CASSANDRA_POOL_MAX_SIZE = 
"cassandra.pool.max.size";
+    public static final String CASSANDRA_LOAD_BALANCING_POLICY = 
"cassandra.loadBalancingPolicy";
+    public static final String CASSANDRA_DATACENTER_NAME = 
"cassandra.datacenter.name";
+    public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL = 
"cassandra.max.requests.per.con.local";
     public static final String CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE = 
"cassandra.max.requests.per.con.remote";
-    public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC      = 
"cassandra.heartbeat.interval.sec";
-    public static final String CASSANDRA_IDLE_TIMEOUT_SEC            = 
"cassandra.idle.timeout.sec";
-    public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS      = 
"cassandra.socket.read.timeout.millis";
-    public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS   = 
"cassandra.socket.connect.timeout.millis";
+    public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC = 
"cassandra.heartbeat.interval.sec";
+    public static final String CASSANDRA_IDLE_TIMEOUT_SEC = 
"cassandra.idle.timeout.sec";
+    public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS = 
"cassandra.socket.read.timeout.millis";
+    public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS = 
"cassandra.socket.connect.timeout.millis";
 
     /**
      * The authorized cassandra username.
@@ -79,7 +72,7 @@ public class CassandraConf implements Serializable {
     /**
      * List of contacts nodes.
      */
-    private String[] nodes = {"localhost"};
+    private String[] nodes = { "localhost" };
 
     /**
      * The port used to connect to nodes.
@@ -93,7 +86,7 @@ public class CassandraConf implements Serializable {
     /**
      * The maximal numbers of rows per batch.
      */
-    private int batchSizeRows       = 100;
+    private int batchSizeRows = 100;
 
     /**
      * The retry policy to use for the new cluster.
@@ -154,22 +147,49 @@ public class CassandraConf implements Serializable {
         this.username = (String) Utils.get(conf, CASSANDRA_USERNAME, null);
         this.password = (String) Utils.get(conf, CASSANDRA_PASSWORD, null);
         this.keyspace = get(conf, CASSANDRA_KEYSPACE);
-        this.consistencyLevel = ConsistencyLevel.valueOf((String) 
Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
-        this.nodes    = ((String) Utils.get(conf, CASSANDRA_NODES, 
"localhost")).split(",");
+        this.consistencyLevel =
+            ConsistencyLevel.valueOf((String) Utils.get(conf, 
CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
+        this.nodes = ((String) Utils.get(conf, CASSANDRA_NODES, 
"localhost")).split(",");
         this.batchSizeRows = 
ObjectReader.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100);
         this.port = ObjectReader.getInt(conf.get(CASSANDRA_PORT), 9042);
         this.retryPolicyName = (String) Utils.get(conf, 
CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName());
         this.reconnectionPolicyBaseMs = 
getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L);
-        this.reconnectionPolicyMaxMs  = 
getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), 
TimeUnit.MINUTES.toMillis(1));
+        this.reconnectionPolicyMaxMs = 
getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), 
TimeUnit.MINUTES.toMillis(1));
         this.poolMaxQueueSize = getInt(conf.get(CASSANDRA_POOL_MAX_SIZE), 256);
         this.loadBalancingPolicyName = (String) Utils.get(conf, 
CASSANDRA_LOAD_BALANCING_POLICY, TokenAwarePolicy.class.getSimpleName());
-        this.datacenterName = (String)Utils.get(conf, 
CASSANDRA_DATACENTER_NAME, null);
+        this.datacenterName = (String) Utils.get(conf, 
CASSANDRA_DATACENTER_NAME, null);
         this.maxRequestPerConnectionLocal = 
getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL), 1024);
         this.maxRequestPerConnectionRemote = 
getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE), 256);
         this.heartbeatIntervalSeconds = 
getInt(conf.get(CASSANDRA_HEARTBEAT_INTERVAL_SEC), 30);
         this.idleTimeoutSeconds = getInt(conf.get(CASSANDRA_IDLE_TIMEOUT_SEC), 
60);
-        this.socketReadTimeoutMillis = 
getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), 
(long)SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS);
-        this.socketConnectTimeoutMillis = 
getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), 
(long)SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS);
+        this.socketReadTimeoutMillis =
+            getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), (long) 
SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS);
+        this.socketConnectTimeoutMillis =
+            getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), (long) 
SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS);
+    }
+
+    public static Integer getInt(Object o, Integer defaultValue) {
+        if (null == o) {
+            return defaultValue;
+        }
+        if (o instanceof Number) {
+            return ((Number) o).intValue();
+        } else if (o instanceof String) {
+            return Integer.parseInt((String) o);
+        }
+        throw new IllegalArgumentException("Don't know how to convert " + o + 
" to int");
+    }
+
+    public static Long getLong(Object o, Long defaultValue) {
+        if (null == o) {
+            return defaultValue;
+        }
+        if (o instanceof Number) {
+            return ((Number) o).longValue();
+        } else if (o instanceof String) {
+            return Long.parseLong((String) o);
+        }
+        throw new IllegalArgumentException("Don't know how to convert " + o + 
" to long");
     }
 
     public String getUsername() {
@@ -209,12 +229,15 @@ public class CassandraConf implements Serializable {
     }
 
     public RetryPolicy getRetryPolicy() {
-        
if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName()))
+        if 
(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName()))
 {
             return new 
LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
-        
if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName()))
+        }
+        if 
(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName())) {
             return FallthroughRetryPolicy.INSTANCE;
-        
if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName()))
+        }
+        if 
(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName())) {
             return DefaultRetryPolicy.INSTANCE;
+        }
         throw new IllegalArgumentException("Unknown cassandra retry policy " + 
this.retryPolicyName);
     }
 
@@ -266,56 +289,32 @@ public class CassandraConf implements Serializable {
 
     private <T> T get(Map<String, Object> conf, String key) {
         Object o = conf.get(key);
-        if(o == null) {
+        if (o == null) {
             throw new IllegalArgumentException("No '" + key + "' value found 
in configuration!");
         }
-        return (T)o;
-    }
-
-    public static Integer getInt(Object o, Integer defaultValue) {
-        if (null == o) {
-            return defaultValue;
-        }
-        if (o instanceof Number) {
-            return ((Number) o).intValue();
-        } else if (o instanceof String) {
-            return Integer.parseInt((String) o);
-        }
-        throw new IllegalArgumentException("Don't know how to convert " + o + 
" to int");
-    }
-
-    public static Long getLong(Object o, Long defaultValue) {
-        if (null == o) {
-            return defaultValue;
-        }
-        if (o instanceof Number) {
-            return ((Number) o).longValue();
-        } else if (o instanceof String) {
-            return Long.parseLong((String) o);
-        }
-        throw new IllegalArgumentException("Don't know how to convert " + o + 
" to long");
+        return (T) o;
     }
 
     @Override
     public String toString() {
         return Objects.toStringHelper(this)
-                .add("username", username)
-                .add("password", password)
-                .add("keyspace", keyspace)
-                .add("nodes", nodes)
-                .add("port", port)
-                .add("consistencyLevel", consistencyLevel)
-                .add("batchSizeRows", batchSizeRows)
-                .add("retryPolicyName", retryPolicyName)
-                .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs)
-                .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
-                .add("poolMaxQueueSize", poolMaxQueueSize)
-                .add("datacenterName", datacenterName)
-                .add("maxRequestPerConnectionLocal", 
maxRequestPerConnectionLocal)
-                .add("maxRequestPerConnectionRemote", 
maxRequestPerConnectionRemote)
-                .add("heartbeatIntervalSeconds", heartbeatIntervalSeconds)
-                .add("idleTimeoutSeconds", idleTimeoutSeconds)
-                .add("socketReadTimeoutMillis", socketReadTimeoutMillis)
-                .toString();
+                      .add("username", username)
+                      .add("password", password)
+                      .add("keyspace", keyspace)
+                      .add("nodes", nodes)
+                      .add("port", port)
+                      .add("consistencyLevel", consistencyLevel)
+                      .add("batchSizeRows", batchSizeRows)
+                      .add("retryPolicyName", retryPolicyName)
+                      .add("reconnectionPolicyBaseMs", 
reconnectionPolicyBaseMs)
+                      .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs)
+                      .add("poolMaxQueueSize", poolMaxQueueSize)
+                      .add("datacenterName", datacenterName)
+                      .add("maxRequestPerConnectionLocal", 
maxRequestPerConnectionLocal)
+                      .add("maxRequestPerConnectionRemote", 
maxRequestPerConnectionRemote)
+                      .add("heartbeatIntervalSeconds", 
heartbeatIntervalSeconds)
+                      .add("idleTimeoutSeconds", idleTimeoutSeconds)
+                      .add("socketReadTimeoutMillis", socketReadTimeoutMillis)
+                      .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
index 407d29c..cfbe5a5 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java
@@ -1,21 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.client;
 
 import com.datastax.driver.core.Cluster;
@@ -43,27 +37,27 @@ public class ClusterFactory extends 
BaseBeanFactory<Cluster> {
         CassandraConf cassandraConf = new CassandraConf(topoConf);
 
         Cluster.Builder cluster = Cluster.builder()
-                .withoutJMXReporting()
-                .withoutMetrics()
-                .addContactPoints(cassandraConf.getNodes())
-                .withPort(cassandraConf.getPort())
-                .withRetryPolicy(cassandraConf.getRetryPolicy())
-                .withReconnectionPolicy(new ExponentialReconnectionPolicy(
-                        cassandraConf.getReconnectionPolicyBaseMs(),
-                        cassandraConf.getReconnectionPolicyMaxMs()))
-                
.withLoadBalancingPolicy(cassandraConf.getLoadBalancingPolicy());
-        
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis((int)cassandraConf.getSocketReadTimeoutMillis());
-        
cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis((int)cassandraConf.getSocketConnectTimeoutMillis());
+                                         .withoutJMXReporting()
+                                         .withoutMetrics()
+                                         
.addContactPoints(cassandraConf.getNodes())
+                                         .withPort(cassandraConf.getPort())
+                                         
.withRetryPolicy(cassandraConf.getRetryPolicy())
+                                         .withReconnectionPolicy(new 
ExponentialReconnectionPolicy(
+                                             
cassandraConf.getReconnectionPolicyBaseMs(),
+                                             
cassandraConf.getReconnectionPolicyMaxMs()))
+                                         
.withLoadBalancingPolicy(cassandraConf.getLoadBalancingPolicy());
+        
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis((int) 
cassandraConf.getSocketReadTimeoutMillis());
+        
cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis((int) 
cassandraConf.getSocketConnectTimeoutMillis());
 
         final String username = cassandraConf.getUsername();
         final String password = cassandraConf.getPassword();
 
-        if( StringUtils.isNotEmpty(username) && 
StringUtils.isNotEmpty(password)) {
+        if (StringUtils.isNotEmpty(username) && 
StringUtils.isNotEmpty(password)) {
             cluster.withAuthProvider(new PlainTextAuthProvider(username, 
password));
         }
 
         QueryOptions options = new QueryOptions()
-                .setConsistencyLevel(cassandraConf.getConsistencyLevel());
+            .setConsistencyLevel(cassandraConf.getConsistencyLevel());
         cluster.withQueryOptions(options);
 
         PoolingOptions poolOps = new PoolingOptions();

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
index 3bd80ed..7df59dd 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java
@@ -1,21 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.client;
 
 import com.datastax.driver.core.Session;

http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
index 412cb70..588a26a 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java
@@ -1,21 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.cassandra.client;
 
 import java.util.Map;

Reply via email to