Fixing stylecheck problems with storm-cassandra
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a2d131f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a2d131f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a2d131f Branch: refs/heads/master Commit: 1a2d131f99e62823bf15fa958cc676a51efda10c Parents: e3f5b13 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 22:44:56 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:22:36 2018 -0400 ---------------------------------------------------------------------- external/storm-cassandra/pom.xml | 2 +- .../AbstractExecutionResultHandler.java | 34 ++-- .../cassandra/BaseExecutionResultHandler.java | 32 ++-- .../storm/cassandra/CassandraContext.java | 28 ++- .../cassandra/DynamicStatementBuilder.java | 38 ++-- .../storm/cassandra/ExecutionResultHandler.java | 27 ++- .../storm/cassandra/Murmur3StreamGrouping.java | 87 ++++----- .../storm/cassandra/bolt/BaseCassandraBolt.java | 42 ++-- .../bolt/BatchCassandraWriterBolt.java | 80 ++++---- .../cassandra/bolt/CassandraWriterBolt.java | 34 ++-- .../cassandra/bolt/GroupingBatchBuilder.java | 29 ++- .../bolt/PairBatchStatementTuples.java | 23 +-- .../cassandra/bolt/PairStatementTuple.java | 26 +-- .../storm/cassandra/client/CassandraConf.java | 191 +++++++++---------- .../storm/cassandra/client/ClusterFactory.java | 46 ++--- .../storm/cassandra/client/SimpleClient.java | 20 +- .../cassandra/client/SimpleClientProvider.java | 20 +- .../cassandra/client/impl/DefaultClient.java | 27 +-- .../cassandra/context/BaseBeanFactory.java | 25 ++- .../storm/cassandra/context/BeanFactory.java | 20 +- .../storm/cassandra/context/WorkerCtx.java | 32 ++-- .../storm/cassandra/executor/AsyncExecutor.java | 98 +++++----- .../executor/AsyncExecutorProvider.java | 22 +-- .../cassandra/executor/AsyncResultHandler.java | 27 +-- .../executor/AsyncResultSetHandler.java | 23 +-- .../executor/ExecutionResultCollector.java | 36 ++-- .../executor/impl/BatchAsyncResultHandler.java | 32 ++-- .../executor/impl/SingleAsyncResultHandler.java | 30 ++- .../query/AyncCQLResultSetValuesMapper.java | 25 +-- .../query/BaseCQLStatementTupleMapper.java | 23 +-- .../query/CQLResultSetValuesMapper.java | 25 +-- .../cassandra/query/CQLStatementBuilder.java | 20 +- .../query/CQLStatementTupleMapper.java | 25 +-- .../apache/storm/cassandra/query/Column.java | 34 ++-- .../storm/cassandra/query/ContextQuery.java | 23 +-- .../apache/storm/cassandra/query/CqlMapper.java | 29 ++- .../cassandra/query/ObjectMapperOperation.java | 19 +- .../builder/BoundCQLStatementMapperBuilder.java | 33 ++-- .../ObjectMapperCqlStatementMapperBuilder.java | 19 +- .../SimpleCQLStatementMapperBuilder.java | 31 ++- .../impl/BatchCQLStatementTupleMapper.java | 29 ++- .../impl/BoundCQLStatementTupleMapper.java | 48 ++--- .../impl/ObjectMapperCqlStatementMapper.java | 27 ++- .../query/impl/PreparedStatementBinder.java | 40 ++-- .../query/impl/RoutingKeyGenerator.java | 25 +-- .../query/impl/SimpleCQLStatementMapper.java | 36 ++-- .../cassandra/query/selector/FieldSelector.java | 25 +-- .../trident/state/CassandraBackingMap.java | 49 ++--- .../trident/state/CassandraMapStateFactory.java | 29 ++- .../cassandra/trident/state/CassandraQuery.java | 25 +-- .../cassandra/trident/state/CassandraState.java | 91 ++++----- .../trident/state/CassandraStateFactory.java | 28 ++- .../trident/state/CassandraStateUpdater.java | 23 +-- .../trident/state/MapStateFactoryBuilder.java | 85 ++++----- .../state/NonTransactionalTupleStateMapper.java | 23 +-- .../trident/state/OpaqueTupleStateMapper.java | 31 ++- .../trident/state/SerializedStateMapper.java | 31 ++- .../trident/state/SimpleStateMapper.java | 26 +-- .../cassandra/trident/state/SimpleTuple.java | 38 ++-- .../cassandra/trident/state/StateMapper.java | 25 +-- .../state/TransactionalTupleStateMapper.java | 28 +-- .../TridentAyncCQLResultSetValuesMapper.java | 28 +-- .../state/TridentResultSetValuesMapper.java | 29 ++- .../apache/storm/cassandra/WeatherSpout.java | 25 +-- .../testtools/EmbeddedCassandraResource.java | 33 ++-- .../storm/cassandra/trident/MapStateTest.java | 103 +++++----- .../cassandra/trident/WeatherBatchSpout.java | 43 ++--- 67 files changed, 1011 insertions(+), 1449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml index d98824c..87471c8 100644 --- a/external/storm-cassandra/pom.xml +++ b/external/storm-cassandra/pom.xml @@ -132,7 +132,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>578</maxAllowedViolations> + <maxAllowedViolations>159</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java index 6510565..3b32749 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/AbstractExecutionResultHandler.java @@ -1,35 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.ReadTimeoutException; import com.datastax.driver.core.exceptions.UnavailableException; import com.datastax.driver.core.exceptions.WriteTimeoutException; +import java.util.List; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * Default interface to define strategies to apply when a query is either succeed or failed. * @@ -40,10 +32,10 @@ public abstract class AbstractExecutionResultHandler implements ExecutionResultH @Override public void onThrowable(Throwable t, OutputCollector collector, Tuple i) { - if( t instanceof QueryValidationException) { - this.onQueryValidationException((QueryValidationException)t, collector, i); + if (t instanceof QueryValidationException) { + this.onQueryValidationException((QueryValidationException) t, collector, i); } else if (t instanceof ReadTimeoutException) { - this.onReadTimeoutException((ReadTimeoutException)t, collector, i); + this.onReadTimeoutException((ReadTimeoutException) t, collector, i); } else if (t instanceof WriteTimeoutException) { this.onWriteTimeoutException((WriteTimeoutException) t, collector, i); } else if (t instanceof UnavailableException) { @@ -56,6 +48,6 @@ public abstract class AbstractExecutionResultHandler implements ExecutionResultH @Override public void onThrowable(Throwable t, OutputCollector collector, List<Tuple> tl) { - for(Tuple i : tl) onThrowable(t, collector, i); + for (Tuple i : tl) onThrowable(t, collector, i); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java index 59b6343..57ef351 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java @@ -1,31 +1,29 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; +import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import com.datastax.driver.core.exceptions.ReadTimeoutException; +import com.datastax.driver.core.exceptions.UnavailableException; +import com.datastax.driver.core.exceptions.WriteTimeoutException; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; -import com.datastax.driver.core.exceptions.*; import org.slf4j.LoggerFactory; /** - * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an {@link com.datastax.driver.core.exceptions.DriverException} is thrown. + * Simple {@link ExecutionResultHandler} which fail the incoming tuple when an + * {@link com.datastax.driver.core.exceptions.DriverException} is thrown. * The exception is then automatically report to storm. * */ @@ -40,6 +38,7 @@ public class BaseExecutionResultHandler extends AbstractExecutionResultHandler { public void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple) { onDriverException(e, collector, tuple); } + /** * {@inheritDoc} */ @@ -47,6 +46,7 @@ public class BaseExecutionResultHandler extends AbstractExecutionResultHandler { public void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple) { onDriverException(e, collector, tuple); } + /** * {@inheritDoc} */ @@ -54,6 +54,7 @@ public class BaseExecutionResultHandler extends AbstractExecutionResultHandler { public void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple) { onDriverException(e, collector, tuple); } + /** * {@inheritDoc} */ @@ -61,6 +62,7 @@ public class BaseExecutionResultHandler extends AbstractExecutionResultHandler { public void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple) { onDriverException(e, collector, tuple); } + /** * {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java index e862755..49763d2 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/CassandraContext.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; import com.datastax.driver.core.Cluster; +import java.util.Map; import org.apache.storm.cassandra.client.CassandraConf; import org.apache.storm.cassandra.client.ClusterFactory; import org.apache.storm.cassandra.client.SimpleClient; @@ -30,8 +24,6 @@ import org.apache.storm.cassandra.context.WorkerCtx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * */ @@ -52,8 +44,9 @@ public class CassandraContext extends WorkerCtx implements SimpleClientProvider @Override public SimpleClient getClient(Map<String, Object> config) { SimpleClient client = getWorkerBean(SimpleClient.class, config); - if (client.isClose() ) + if (client.isClose()) { client = getWorkerBean(SimpleClient.class, config, true); + } return client; } @@ -76,13 +69,14 @@ public class CassandraContext extends WorkerCtx implements SimpleClientProvider public static final class ClientFactory extends BaseBeanFactory<SimpleClient> { private static final Logger LOG = LoggerFactory.getLogger(ClientFactory.class); + /** * {@inheritDoc} */ @Override protected SimpleClient make(Map<String, Object> topoConf) { Cluster cluster = this.context.getWorkerBean(Cluster.class, topoConf); - if( cluster.isClosed() ) { + if (cluster.isClosed()) { LOG.warn("Cluster is closed - trigger new initialization!"); cluster = this.context.getWorkerBean(Cluster.class, topoConf, true); } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java index 2bd7107..823a566 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/DynamicStatementBuilder.java @@ -1,40 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.querybuilder.BuiltStatement; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.storm.cassandra.query.CQLStatementBuilder; import org.apache.storm.cassandra.query.CQLStatementTupleMapper; import org.apache.storm.cassandra.query.ContextQuery; import org.apache.storm.cassandra.query.CqlMapper; -import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper; import org.apache.storm.cassandra.query.builder.BoundCQLStatementMapperBuilder; import org.apache.storm.cassandra.query.builder.SimpleCQLStatementMapperBuilder; +import org.apache.storm.cassandra.query.impl.BatchCQLStatementTupleMapper; import org.apache.storm.cassandra.query.selector.FieldSelector; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - public class DynamicStatementBuilder implements Serializable { private DynamicStatementBuilder() { @@ -84,12 +76,14 @@ public class DynamicStatementBuilder implements Serializable { public static final BatchCQLStatementTupleMapper loggedBatch(CQLStatementBuilder... builders) { return newBatchStatementBuilder(BatchStatement.Type.LOGGED, builders); } + /** * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#COUNTER} batch statement for the specified CQL statement builders. */ public static final BatchCQLStatementTupleMapper counterBatch(CQLStatementBuilder... builders) { return newBatchStatementBuilder(BatchStatement.Type.COUNTER, builders); } + /** * Creates a new {@link com.datastax.driver.core.BatchStatement.Type#UNLOGGED} batch statement for the specified CQL statement builders. */ @@ -99,7 +93,7 @@ public class DynamicStatementBuilder implements Serializable { private static BatchCQLStatementTupleMapper newBatchStatementBuilder(BatchStatement.Type type, CQLStatementBuilder[] builders) { List<CQLStatementTupleMapper> mappers = new ArrayList<>(builders.length); - for(CQLStatementBuilder b : Arrays.asList(builders)) + for (CQLStatementBuilder b : Arrays.asList(builders)) mappers.add(b.build()); return new BatchCQLStatementTupleMapper(type, mappers); } @@ -142,8 +136,8 @@ public class DynamicStatementBuilder implements Serializable { public static final FieldSelector[] fields(final String... fields) { int size = fields.length; List<FieldSelector> fl = new ArrayList<>(size); - for(int i = 0 ; i < size; i++) - fl.add(new FieldSelector(fields[i])); + for (int i = 0; i < size; i++) + fl.add(new FieldSelector(fields[i])); return fl.toArray(new FieldSelector[size]); } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java index 6113efb..0e2a4db 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/ExecutionResultHandler.java @@ -1,33 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.exceptions.QueryValidationException; import com.datastax.driver.core.exceptions.ReadTimeoutException; import com.datastax.driver.core.exceptions.UnavailableException; import com.datastax.driver.core.exceptions.WriteTimeoutException; - import java.io.Serializable; import java.util.List; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; /** * Default interface to define strategies to apply when a query is either succeed or failed. @@ -43,6 +35,7 @@ public interface ExecutionResultHandler extends Serializable { * @param tuple an input tuple. */ void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple); + /** * Invoked when a {@link com.datastax.driver.core.exceptions.ReadTimeoutException} is thrown. * @@ -60,6 +53,7 @@ public interface ExecutionResultHandler extends Serializable { * @param tuple an input tuple. */ void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple); + /** * Invoked when a {@link com.datastax.driver.core.exceptions.UnavailableException} is thrown. * @@ -69,6 +63,7 @@ public interface ExecutionResultHandler extends Serializable { * @param tuple an input tuple. */ void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple); + /** * Invoked when a query is executed with success. * This method is NOT responsible for acknowledging input tuple. http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java index 774955a..c172cb6 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java @@ -1,39 +1,31 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.grouping.CustomStreamGrouping; -import org.apache.storm.task.WorkerTopologyContext; -import org.apache.storm.topology.FailedException; -import org.apache.storm.tuple.Fields; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; - import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.topology.FailedException; +import org.apache.storm.tuple.Fields; /** * @@ -57,8 +49,8 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { * Creates a new {@link Murmur3StreamGrouping} instance. * @param partitionKeyNames {@link org.apache.storm.cassandra.Murmur3StreamGrouping#partitionKeyNames}. */ - public Murmur3StreamGrouping(String...partitionKeyNames) { - this( Arrays.asList(partitionKeyNames)); + public Murmur3StreamGrouping(String... partitionKeyNames) { + this(Arrays.asList(partitionKeyNames)); } /** @@ -69,6 +61,30 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { this.partitionKeyNames = partitionKeyNames; } + /** + * Computes the murmur3 hash for the specified values. + * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys + * https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java + * + * @param values the fields which are part of the (compose) partition key. + * @return the computed hash for input values. + * @throws java.io.IOException + */ + @VisibleForTesting + public static long hashes(List<Object> values) throws IOException { + byte[] keyBytes; + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(bos)) { + for (Object key : values) { + byte[] arr = ((String) key).getBytes("UTF-8"); + out.writeShort(arr.length); + out.write(arr, 0, arr.length); + out.writeByte(0); + } + out.flush(); + keyBytes = bos.toByteArray(); + } + return Hashing.murmur3_128().hashBytes(keyBytes).asLong(); + } /** * {@inheritDoc} @@ -90,7 +106,7 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { try { - int n = Math.abs( (int) hashes(getKeyValues(values)) % targetTasks.size() ); + int n = Math.abs((int) hashes(getKeyValues(values)) % targetTasks.size()); return Lists.newArrayList(targetTasks.get(n)); } catch (IOException e) { throw new FailedException(e); @@ -99,34 +115,9 @@ public class Murmur3StreamGrouping implements CustomStreamGrouping { private List<Object> getKeyValues(List<Object> values) { List<Object> keys = new ArrayList<>(); - for(Integer idx : partitionKeyIndexes) { + for (Integer idx : partitionKeyIndexes) { keys.add(values.get(idx)); } return keys; } - - /** - * Computes the murmur3 hash for the specified values. - * http://stackoverflow.com/questions/27212797/cassandra-hashing-algorithm-with-composite-keys - * https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/marshal/CompositeType.java - * - * @param values the fields which are part of the (compose) partition key. - * @return the computed hash for input values. - * @throws java.io.IOException - */ - @VisibleForTesting - public static long hashes(List<Object> values) throws IOException { - byte[] keyBytes; - try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(bos)) { - for(Object key : values) { - byte[] arr = ((String)key).getBytes("UTF-8"); - out.writeShort(arr.length); - out.write(arr, 0, arr.length); - out.writeByte(0); - } - out.flush(); - keyBytes = bos.toByteArray(); - } - return Hashing.murmur3_128().hashBytes(keyBytes).asLong(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java index f02fa26..946932f 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java @@ -1,26 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; import org.apache.storm.Config; import org.apache.storm.cassandra.BaseExecutionResultHandler; import org.apache.storm.cassandra.CassandraContext; @@ -42,9 +38,6 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - /** * A base cassandra bolt. * @@ -55,7 +48,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class); protected OutputCollector outputCollector; - + protected SimpleClientProvider clientProvider; protected SimpleClient client; protected Session session; @@ -66,7 +59,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { private CQLStatementTupleMapper mapper; private ExecutionResultHandler resultHandler; - transient private Map<String, Fields> outputsFields = new HashMap<>(); + transient private Map<String, Fields> outputsFields = new HashMap<>(); private Map<String, Object> cassandraConfig; /** @@ -77,6 +70,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { this.mapper = mapper; this.clientProvider = clientProvider; } + /** * Creates a new {@link CassandraWriterBolt} instance. * @param tupleMapper @@ -129,7 +123,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { * @param fields */ public BaseCassandraBolt withStreamOutputFields(String stream, Fields fields) { - if( stream == null || stream.length() == 0) throw new IllegalArgumentException("'stream' should not be null"); + if (stream == null || stream.length() == 0) throw new IllegalArgumentException("'stream' should not be null"); this.outputsFields.put(stream, fields); return this; } @@ -141,7 +135,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { * @param config */ public BaseCassandraBolt withCassandraConfig(Map<String, Object> config) { - if(config == null) { + if (config == null) { throw new IllegalArgumentException("config should not be null"); } cassandraConfig = config; @@ -149,7 +143,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { } protected ExecutionResultHandler getResultHandler() { - if(resultHandler == null) resultHandler = new BaseExecutionResultHandler(); + if (resultHandler == null) resultHandler = new BaseExecutionResultHandler(); return resultHandler; } @@ -157,7 +151,7 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { return mapper; } - abstract protected AsyncResultHandler<T> getAsyncHandler() ; + abstract protected AsyncResultHandler<T> getAsyncHandler(); protected AsyncExecutor<T> getAsyncExecutor() { return AsyncExecutorProvider.getLocal(session, getAsyncHandler()); @@ -182,8 +176,8 @@ public abstract class BaseCassandraBolt<T> extends BaseTickTupleAwareRichBolt { // outputsFields can be empty if this bolt acts like a sink in topology. if (!outputsFields.isEmpty()) { Fields fields = outputsFields.remove(Utils.DEFAULT_STREAM_ID); - if( fields != null) declarer.declare(fields); - for(Map.Entry<String, Fields> entry : outputsFields.entrySet()) { + if (fields != null) declarer.declare(fields); + for (Map.Entry<String, Fields> entry : outputsFields.entrySet()) { declarer.declareStream(entry.getKey(), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java index 292f36d..d87ecab 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java @@ -1,51 +1,42 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; +import com.datastax.driver.core.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.storm.Config; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Time; -import com.datastax.driver.core.Statement; -import org.apache.storm.cassandra.executor.AsyncResultHandler; -import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler; -import org.apache.storm.cassandra.query.CQLStatementTupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { - private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class); - public static final int DEFAULT_EMIT_FREQUENCY = 2; - + private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class); private LinkedBlockingQueue<Tuple> queue; - + private int tickFrequencyInSeconds; - + private long lastModifiedTimesMillis; private int batchMaxSize = 1000; @@ -72,6 +63,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { super(tupleMapper); this.tickFrequencyInSeconds = tickFrequencyInSeconds; } + /** * {@inheritDoc} */ @@ -85,7 +77,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { @Override protected AsyncResultHandler<List<Tuple>> getAsyncHandler() { - if( asyncResultHandler == null) { + if (asyncResultHandler == null) { asyncResultHandler = new BatchAsyncResultHandler(getResultHandler()); } return asyncResultHandler; @@ -96,12 +88,13 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { */ @Override protected void process(Tuple input) { - if( ! queue.offer(input) ) { + if (!queue.offer(input)) { LOG.info(logPrefix() + "The message queue is full, preparing batch statement..."); prepareAndExecuteStatement(); queue.add(input); } } + /** * {@inheritDoc} */ @@ -112,7 +105,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { public void prepareAndExecuteStatement() { int size = queue.size(); - if( size > 0 ) { + if (size > 0) { List<Tuple> inputs = new ArrayList<>(size); queue.drainTo(inputs); try { @@ -127,16 +120,18 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { int batchSize = 0; for (PairBatchStatementTuples batch : batchBuilder) { - LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConf.getKeyspace(), batch.getInputs().size()); + LOG.debug(logPrefix() + "Writing data to {} in batches of {} rows.", cassandraConf.getKeyspace(), + batch.getInputs().size()); getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs()); batchSize++; } int pending = getAsyncExecutor().getPendingTasksSize(); if (pending > batchSize) { - LOG.warn( logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize, pending); + LOG.warn(logPrefix() + "Currently pending tasks is superior to the number of submit batches({}) : {}", batchSize, + pending); } - + } catch (Throwable r) { LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements"); getAsyncHandler().failure(r, inputs); @@ -147,17 +142,19 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { private List<PairStatementTuple> buildStatement(List<Tuple> inputs) { List<PairStatementTuple> stmts = new ArrayList<>(inputs.size()); - for(Tuple t : inputs) { + for (Tuple t : inputs) { List<Statement> sl = getMapper().map(topoConfig, session, t); - for(Statement s : sl) - stmts.add(new PairStatementTuple(t, s) ); + for (Statement s : sl) + stmts.add(new PairStatementTuple(t, s)); } return stmts; } private void checkTimeElapsedSinceLastExec(int sinceLastModified) { - if(sinceLastModified > tickFrequencyInSeconds) - LOG.warn( logPrefix() + "The time elapsed since last execution exceeded tick tuple frequency - {} > {} seconds", sinceLastModified, tickFrequencyInSeconds); + if (sinceLastModified > tickFrequencyInSeconds) { + LOG.warn(logPrefix() + "The time elapsed since last execution exceeded tick tuple frequency - {} > {} seconds", + sinceLastModified, tickFrequencyInSeconds); + } } private String logPrefix() { @@ -165,7 +162,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { } public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit unit) { - this.tickFrequencyInSeconds = (int)unit.toSeconds(time); + this.tickFrequencyInSeconds = (int) unit.toSeconds(time); return this; } @@ -178,6 +175,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { this.batchMaxSize = size; return this; } + /** * {@inheritDoc} */ @@ -187,7 +185,7 @@ public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds); return conf; } - + private int updateAndGetSecondsSinceLastModified() { long now = now(); int seconds = (int) (now - lastModifiedTimesMillis) / 1000; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java index 876f6f5..fc45b3f 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/CassandraWriterBolt.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.Statement; +import java.util.List; import org.apache.storm.cassandra.executor.AsyncResultHandler; import org.apache.storm.cassandra.executor.impl.SingleAsyncResultHandler; import org.apache.storm.cassandra.query.CQLStatementTupleMapper; - -import java.util.List; +import org.apache.storm.tuple.Tuple; public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> { @@ -44,7 +37,7 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> { */ @Override protected AsyncResultHandler<Tuple> getAsyncHandler() { - if( asyncResultHandler == null) { + if (asyncResultHandler == null) { asyncResultHandler = new SingleAsyncResultHandler(getResultHandler()); } return asyncResultHandler; @@ -56,8 +49,11 @@ public class CassandraWriterBolt extends BaseCassandraBolt<Tuple> { @Override protected void process(Tuple input) { List<Statement> statements = getMapper().map(topoConfig, session, input); - if (statements.size() == 1) getAsyncExecutor().execAsync(statements.get(0), input); - else getAsyncExecutor().execAsync(statements, input); + if (statements.size() == 1) { + getAsyncExecutor().execAsync(statements.get(0), input); + } else { + getAsyncExecutor().execAsync(statements, input); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java index fdafd50..3697f81 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/GroupingBatchBuilder.java @@ -1,39 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.BatchStatement; import com.google.common.base.Function; import com.google.common.collect.Iterables; - import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.storm.tuple.Tuple; /** - * + * */ public class GroupingBatchBuilder implements Iterable<PairBatchStatementTuples> { private int batchSizeRows; - + private List<PairStatementTuple> statements; /** @@ -50,7 +43,7 @@ public class GroupingBatchBuilder implements Iterable<PairBatchStatementTuples> return build().iterator(); } - private Iterable<PairBatchStatementTuples> build( ) { + private Iterable<PairBatchStatementTuples> build() { Iterable<List<PairStatementTuple>> partition = Iterables.partition(statements, batchSizeRows); return Iterables.transform(partition, new Function<List<PairStatementTuple>, PairBatchStatementTuples>() { @Override http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java index cef422e..e6eef06 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairBatchStatementTuples.java @@ -1,27 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.BatchStatement; - import java.util.List; +import org.apache.storm.tuple.Tuple; /** * Simple class to pair a list of tuples with a single batch statement. http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java index 0f501a3..d95e9d3 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/PairStatementTuple.java @@ -1,33 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.bolt; -import org.apache.storm.tuple.Tuple; import com.datastax.driver.core.Statement; +import org.apache.storm.tuple.Tuple; /** * Simple class to pair a tuple with a statement. */ public class PairStatementTuple { - + private final Tuple tuple; - + private final Statement statement; /** http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java index 19b7551..82f017f 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java @@ -1,68 +1,61 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.client; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.Builder; -import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.datastax.driver.core.policies.LoggingRetryPolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; -import com.datastax.driver.core.policies.TokenAwarePolicy; -import org.apache.commons.lang3.StringUtils; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.ObjectReader; -import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.LoggingRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; import com.google.common.base.Objects; - import java.io.Serializable; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; /** * Configuration used by cassandra storm components. */ public class CassandraConf implements Serializable { - - public static final String CASSANDRA_USERNAME = "cassandra.username"; - public static final String CASSANDRA_PASSWORD = "cassandra.password"; - public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace"; - public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.output.consistencyLevel"; - public static final String CASSANDRA_NODES = "cassandra.nodes"; - public static final String CASSANDRA_PORT = "cassandra.port"; - public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows"; - public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy"; - public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs"; - public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs"; - public static final String CASSANDRA_POOL_MAX_SIZE = "cassandra.pool.max.size"; - public static final String CASSANDRA_LOAD_BALANCING_POLICY = "cassandra.loadBalancingPolicy"; - public static final String CASSANDRA_DATACENTER_NAME = "cassandra.datacenter.name"; - public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL = "cassandra.max.requests.per.con.local"; + + public static final String CASSANDRA_USERNAME = "cassandra.username"; + public static final String CASSANDRA_PASSWORD = "cassandra.password"; + public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace"; + public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.output.consistencyLevel"; + public static final String CASSANDRA_NODES = "cassandra.nodes"; + public static final String CASSANDRA_PORT = "cassandra.port"; + public static final String CASSANDRA_BATCH_SIZE_ROWS = "cassandra.batch.size.rows"; + public static final String CASSANDRA_RETRY_POLICY = "cassandra.retryPolicy"; + public static final String CASSANDRA_RECONNECT_POLICY_BASE_MS = "cassandra.reconnectionPolicy.baseDelayMs"; + public static final String CASSANDRA_RECONNECT_POLICY_MAX_MS = "cassandra.reconnectionPolicy.maxDelayMs"; + public static final String CASSANDRA_POOL_MAX_SIZE = "cassandra.pool.max.size"; + public static final String CASSANDRA_LOAD_BALANCING_POLICY = "cassandra.loadBalancingPolicy"; + public static final String CASSANDRA_DATACENTER_NAME = "cassandra.datacenter.name"; + public static final String CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL = "cassandra.max.requests.per.con.local"; public static final String CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE = "cassandra.max.requests.per.con.remote"; - public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC = "cassandra.heartbeat.interval.sec"; - public static final String CASSANDRA_IDLE_TIMEOUT_SEC = "cassandra.idle.timeout.sec"; - public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS = "cassandra.socket.read.timeout.millis"; - public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS = "cassandra.socket.connect.timeout.millis"; + public static final String CASSANDRA_HEARTBEAT_INTERVAL_SEC = "cassandra.heartbeat.interval.sec"; + public static final String CASSANDRA_IDLE_TIMEOUT_SEC = "cassandra.idle.timeout.sec"; + public static final String CASSANDRA_SOCKET_READ_TIMEOUT_MS = "cassandra.socket.read.timeout.millis"; + public static final String CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS = "cassandra.socket.connect.timeout.millis"; /** * The authorized cassandra username. @@ -79,7 +72,7 @@ public class CassandraConf implements Serializable { /** * List of contacts nodes. */ - private String[] nodes = {"localhost"}; + private String[] nodes = { "localhost" }; /** * The port used to connect to nodes. @@ -93,7 +86,7 @@ public class CassandraConf implements Serializable { /** * The maximal numbers of rows per batch. */ - private int batchSizeRows = 100; + private int batchSizeRows = 100; /** * The retry policy to use for the new cluster. @@ -154,22 +147,49 @@ public class CassandraConf implements Serializable { this.username = (String) Utils.get(conf, CASSANDRA_USERNAME, null); this.password = (String) Utils.get(conf, CASSANDRA_PASSWORD, null); this.keyspace = get(conf, CASSANDRA_KEYSPACE); - this.consistencyLevel = ConsistencyLevel.valueOf((String) Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name())); - this.nodes = ((String) Utils.get(conf, CASSANDRA_NODES, "localhost")).split(","); + this.consistencyLevel = + ConsistencyLevel.valueOf((String) Utils.get(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name())); + this.nodes = ((String) Utils.get(conf, CASSANDRA_NODES, "localhost")).split(","); this.batchSizeRows = ObjectReader.getInt(conf.get(CASSANDRA_BATCH_SIZE_ROWS), 100); this.port = ObjectReader.getInt(conf.get(CASSANDRA_PORT), 9042); this.retryPolicyName = (String) Utils.get(conf, CASSANDRA_RETRY_POLICY, DefaultRetryPolicy.class.getSimpleName()); this.reconnectionPolicyBaseMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_BASE_MS), 100L); - this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1)); + this.reconnectionPolicyMaxMs = getLong(conf.get(CASSANDRA_RECONNECT_POLICY_MAX_MS), TimeUnit.MINUTES.toMillis(1)); this.poolMaxQueueSize = getInt(conf.get(CASSANDRA_POOL_MAX_SIZE), 256); this.loadBalancingPolicyName = (String) Utils.get(conf, CASSANDRA_LOAD_BALANCING_POLICY, TokenAwarePolicy.class.getSimpleName()); - this.datacenterName = (String)Utils.get(conf, CASSANDRA_DATACENTER_NAME, null); + this.datacenterName = (String) Utils.get(conf, CASSANDRA_DATACENTER_NAME, null); this.maxRequestPerConnectionLocal = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_LOCAL), 1024); this.maxRequestPerConnectionRemote = getInt(conf.get(CASSANDRA_MAX_REQUESTS_PER_CON_REMOTE), 256); this.heartbeatIntervalSeconds = getInt(conf.get(CASSANDRA_HEARTBEAT_INTERVAL_SEC), 30); this.idleTimeoutSeconds = getInt(conf.get(CASSANDRA_IDLE_TIMEOUT_SEC), 60); - this.socketReadTimeoutMillis = getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), (long)SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS); - this.socketConnectTimeoutMillis = getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), (long)SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS); + this.socketReadTimeoutMillis = + getLong(conf.get(CASSANDRA_SOCKET_READ_TIMEOUT_MS), (long) SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS); + this.socketConnectTimeoutMillis = + getLong(conf.get(CASSANDRA_SOCKET_CONNECT_TIMEOUT_MS), (long) SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS); + } + + public static Integer getInt(Object o, Integer defaultValue) { + if (null == o) { + return defaultValue; + } + if (o instanceof Number) { + return ((Number) o).intValue(); + } else if (o instanceof String) { + return Integer.parseInt((String) o); + } + throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); + } + + public static Long getLong(Object o, Long defaultValue) { + if (null == o) { + return defaultValue; + } + if (o instanceof Number) { + return ((Number) o).longValue(); + } else if (o instanceof String) { + return Long.parseLong((String) o); + } + throw new IllegalArgumentException("Don't know how to convert " + o + " to long"); } public String getUsername() { @@ -209,12 +229,15 @@ public class CassandraConf implements Serializable { } public RetryPolicy getRetryPolicy() { - if(this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName())) + if (this.retryPolicyName.equals(DowngradingConsistencyRetryPolicy.class.getSimpleName())) { return new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); - if(this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName())) + } + if (this.retryPolicyName.equals(FallthroughRetryPolicy.class.getSimpleName())) { return FallthroughRetryPolicy.INSTANCE; - if(this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName())) + } + if (this.retryPolicyName.equals(DefaultRetryPolicy.class.getSimpleName())) { return DefaultRetryPolicy.INSTANCE; + } throw new IllegalArgumentException("Unknown cassandra retry policy " + this.retryPolicyName); } @@ -266,56 +289,32 @@ public class CassandraConf implements Serializable { private <T> T get(Map<String, Object> conf, String key) { Object o = conf.get(key); - if(o == null) { + if (o == null) { throw new IllegalArgumentException("No '" + key + "' value found in configuration!"); } - return (T)o; - } - - public static Integer getInt(Object o, Integer defaultValue) { - if (null == o) { - return defaultValue; - } - if (o instanceof Number) { - return ((Number) o).intValue(); - } else if (o instanceof String) { - return Integer.parseInt((String) o); - } - throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); - } - - public static Long getLong(Object o, Long defaultValue) { - if (null == o) { - return defaultValue; - } - if (o instanceof Number) { - return ((Number) o).longValue(); - } else if (o instanceof String) { - return Long.parseLong((String) o); - } - throw new IllegalArgumentException("Don't know how to convert " + o + " to long"); + return (T) o; } @Override public String toString() { return Objects.toStringHelper(this) - .add("username", username) - .add("password", password) - .add("keyspace", keyspace) - .add("nodes", nodes) - .add("port", port) - .add("consistencyLevel", consistencyLevel) - .add("batchSizeRows", batchSizeRows) - .add("retryPolicyName", retryPolicyName) - .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs) - .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs) - .add("poolMaxQueueSize", poolMaxQueueSize) - .add("datacenterName", datacenterName) - .add("maxRequestPerConnectionLocal", maxRequestPerConnectionLocal) - .add("maxRequestPerConnectionRemote", maxRequestPerConnectionRemote) - .add("heartbeatIntervalSeconds", heartbeatIntervalSeconds) - .add("idleTimeoutSeconds", idleTimeoutSeconds) - .add("socketReadTimeoutMillis", socketReadTimeoutMillis) - .toString(); + .add("username", username) + .add("password", password) + .add("keyspace", keyspace) + .add("nodes", nodes) + .add("port", port) + .add("consistencyLevel", consistencyLevel) + .add("batchSizeRows", batchSizeRows) + .add("retryPolicyName", retryPolicyName) + .add("reconnectionPolicyBaseMs", reconnectionPolicyBaseMs) + .add("reconnectionPolicyMaxMs", reconnectionPolicyMaxMs) + .add("poolMaxQueueSize", poolMaxQueueSize) + .add("datacenterName", datacenterName) + .add("maxRequestPerConnectionLocal", maxRequestPerConnectionLocal) + .add("maxRequestPerConnectionRemote", maxRequestPerConnectionRemote) + .add("heartbeatIntervalSeconds", heartbeatIntervalSeconds) + .add("idleTimeoutSeconds", idleTimeoutSeconds) + .add("socketReadTimeoutMillis", socketReadTimeoutMillis) + .toString(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java index 407d29c..cfbe5a5 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.client; import com.datastax.driver.core.Cluster; @@ -43,27 +37,27 @@ public class ClusterFactory extends BaseBeanFactory<Cluster> { CassandraConf cassandraConf = new CassandraConf(topoConf); Cluster.Builder cluster = Cluster.builder() - .withoutJMXReporting() - .withoutMetrics() - .addContactPoints(cassandraConf.getNodes()) - .withPort(cassandraConf.getPort()) - .withRetryPolicy(cassandraConf.getRetryPolicy()) - .withReconnectionPolicy(new ExponentialReconnectionPolicy( - cassandraConf.getReconnectionPolicyBaseMs(), - cassandraConf.getReconnectionPolicyMaxMs())) - .withLoadBalancingPolicy(cassandraConf.getLoadBalancingPolicy()); - cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis((int)cassandraConf.getSocketReadTimeoutMillis()); - cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis((int)cassandraConf.getSocketConnectTimeoutMillis()); + .withoutJMXReporting() + .withoutMetrics() + .addContactPoints(cassandraConf.getNodes()) + .withPort(cassandraConf.getPort()) + .withRetryPolicy(cassandraConf.getRetryPolicy()) + .withReconnectionPolicy(new ExponentialReconnectionPolicy( + cassandraConf.getReconnectionPolicyBaseMs(), + cassandraConf.getReconnectionPolicyMaxMs())) + .withLoadBalancingPolicy(cassandraConf.getLoadBalancingPolicy()); + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis((int) cassandraConf.getSocketReadTimeoutMillis()); + cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis((int) cassandraConf.getSocketConnectTimeoutMillis()); final String username = cassandraConf.getUsername(); final String password = cassandraConf.getPassword(); - if( StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { + if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { cluster.withAuthProvider(new PlainTextAuthProvider(username, password)); } QueryOptions options = new QueryOptions() - .setConsistencyLevel(cassandraConf.getConsistencyLevel()); + .setConsistencyLevel(cassandraConf.getConsistencyLevel()); cluster.withQueryOptions(options); PoolingOptions poolOps = new PoolingOptions(); http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java index 3bd80ed..7df59dd 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClient.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.client; import com.datastax.driver.core.Session; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java index 412cb70..588a26a 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/SimpleClientProvider.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.client; import java.util.Map;
