[ https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008352#comment-15008352 ]
ASF GitHub Bot commented on STORM-1075: --------------------------------------- Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45037220 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java --- @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import org.apache.storm.cassandra.BaseExecutionResultHandler; +import org.apache.storm.cassandra.CassandraContext; +import org.apache.storm.cassandra.ExecutionResultHandler; +import org.apache.storm.cassandra.client.CassandraConf; +import org.apache.storm.cassandra.client.SimpleClient; +import org.apache.storm.cassandra.client.SimpleClientProvider; +import org.apache.storm.cassandra.executor.AsyncExecutor; +import org.apache.storm.cassandra.executor.AsyncExecutorProvider; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * A base cassandra bolt. + * + * Default {@link backtype.storm.topology.base.BaseRichBolt} + */ +public abstract class BaseCassandraBolt<T> extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class); + + protected OutputCollector outputCollector; + + protected SimpleClientProvider clientProvider; + protected SimpleClient client; + protected Session session; + protected Map stormConfig; + + protected CassandraConf cassandraConfConfig; + + private CQLStatementTupleMapper mapper; + private ExecutionResultHandler resultHandler; + + transient private Map<String, Fields> outputsFields = new HashMap<>(); + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * @param mapper + */ + public BaseCassandraBolt(CQLStatementTupleMapper mapper, SimpleClientProvider clientProvider) { + this.mapper = mapper; + this.clientProvider = clientProvider; + } + /** + * Creates a new {@link CassandraWriterBolt} instance. + * @param tupleMapper + */ + public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) { + this(tupleMapper, new CassandraContext()); + } + + /** + * {@inheritDoc} + */ + @Override + public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { + this.outputCollector = outputCollector; + this.stormConfig = stormConfig; + this.cassandraConfConfig = new CassandraConf(stormConfig); + this.client = clientProvider.getClient(this.stormConfig); + try { + session = client.connect(); + } catch (NoHostAvailableException e) { + outputCollector.reportError(e); + } + } + + public BaseCassandraBolt withResultHandler(ExecutionResultHandler resultHandler) { + this.resultHandler = resultHandler; + return this; + } + + public BaseCassandraBolt withOutputFields(Fields fields) { + this.outputsFields.put(null, fields); --- End diff -- Not a good practice to pass null to a map. It seems this is passed to set fields for default stream. You should use backtype.storm.utils.Utils.DEFAULT_STREAM_ID as key instead of null. > Storm Cassandra connector > ------------------------- > > Key: STORM-1075 > URL: https://issues.apache.org/jira/browse/STORM-1075 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Sriharsha Chintalapani > Assignee: Satish Duggana > -- This message was sent by Atlassian JIRA (v6.3.4#6332)