Fixing stylecheck problems with storm-hbase
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/880d14f1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/880d14f1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/880d14f1 Branch: refs/heads/master Commit: 880d14f1e7c6d450375b195f3aa5bc4045151fab Parents: 6ccf6a0 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:08:31 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:37:54 2018 -0400 ---------------------------------------------------------------------- external/storm-hbase/pom.xml | 2 +- .../storm/hbase/bolt/AbstractHBaseBolt.java | 42 ++--- .../org/apache/storm/hbase/bolt/HBaseBolt.java | 42 ++--- .../storm/hbase/bolt/HBaseLookupBolt.java | 161 +++++++++--------- .../storm/hbase/bolt/mapper/HBaseMapper.java | 27 ++- .../bolt/mapper/HBaseProjectionCriteria.java | 75 ++++----- .../hbase/bolt/mapper/HBaseValueMapper.java | 31 ++-- .../hbase/bolt/mapper/SimpleHBaseMapper.java | 55 +++--- .../apache/storm/hbase/common/ColumnList.java | 166 +++++++++---------- .../apache/storm/hbase/common/HBaseClient.java | 42 ++--- .../org/apache/storm/hbase/common/IColumn.java | 26 ++- .../org/apache/storm/hbase/common/ICounter.java | 25 ++- .../org/apache/storm/hbase/common/Utils.java | 83 +++++----- .../storm/hbase/state/HBaseKeyValueState.java | 36 ++-- .../hbase/state/HBaseKeyValueStateIterator.java | 42 ++--- .../hbase/state/HBaseKeyValueStateProvider.java | 31 ++-- .../mapper/SimpleTridentHBaseMapMapper.java | 2 +- .../mapper/SimpleTridentHBaseMapper.java | 45 +++-- .../trident/mapper/TridentHBaseMapper.java | 34 ++-- .../hbase/trident/state/HBaseMapState.java | 142 ++++++++-------- .../storm/hbase/trident/state/HBaseQuery.java | 24 +-- .../storm/hbase/trident/state/HBaseState.java | 125 +++++++------- .../hbase/trident/state/HBaseStateFactory.java | 24 +-- .../storm/hbase/trident/state/HBaseUpdater.java | 24 +-- .../trident/windowing/HBaseWindowsStore.java | 65 ++++---- .../windowing/HBaseWindowsStoreFactory.java | 24 +-- .../storm/hbase/state/HBaseClientTestUtil.java | 38 ++--- .../state/HBaseKeyValueStateIteratorTest.java | 28 ++-- .../state/HBaseKeyValueStateProviderTest.java | 15 +- .../hbase/state/HBaseKeyValueStateTest.java | 35 ++-- 30 files changed, 684 insertions(+), 827 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index 6d96620..e32f3c3 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -131,7 +131,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>371</maxAllowedViolations> + <maxAllowedViolations>100</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java index 9cf1935..0ff60b3 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/AbstractHBaseBolt.java @@ -1,38 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.bolt; -import org.apache.storm.Config; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.base.BaseRichBolt; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.lang.Validate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.storm.Config; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichBolt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - // TODO support more configuration options, for now we're defaulting to the hbase-*.xml files found on the classpath public abstract class AbstractHBaseBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseBolt.class); @@ -55,15 +49,15 @@ public abstract class AbstractHBaseBolt extends BaseRichBolt { this.collector = collector; final Configuration hbConfig = HBaseConfiguration.create(); - Map<String, Object> conf = (Map<String, Object>)topoConf.get(this.configKey); - if(conf == null) { + Map<String, Object> conf = (Map<String, Object>) topoConf.get(this.configKey); + if (conf == null) { throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'"); } - if(conf.get("hbase.rootdir") == null) { + if (conf.get("hbase.rootdir") == null) { LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults."); } - for(String key : conf.keySet()) { + for (String key : conf.keySet()) { hbConfig.set(key, String.valueOf(conf.get(key))); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java index bae7aeb..fa6acb5 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java @@ -1,46 +1,39 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.bolt; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.storm.hbase.bolt.mapper.HBaseMapper; +import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.BatchHelper; import org.apache.storm.utils.TupleUtils; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.storm.hbase.bolt.mapper.HBaseMapper; -import org.apache.storm.hbase.common.ColumnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.List; -import java.util.LinkedList; - /** * Basic bolt for writing to HBase. * * Note: Each HBaseBolt defined in a topology is tied to a specific table. - * */ -public class HBaseBolt extends AbstractHBaseBolt { +public class HBaseBolt extends AbstractHBaseBolt { private static final Logger LOG = LoggerFactory.getLogger(HBaseBolt.class); private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; @@ -87,7 +80,8 @@ public class HBaseBolt extends AbstractHBaseBolt { if (batchHelper.shouldHandle(tuple)) { byte[] rowKey = this.mapper.rowKey(tuple); ColumnList cols = this.mapper.columns(tuple); - List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); + List<Mutation> mutations = + hBaseClient.constructMutationReq(rowKey, cols, writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); batchMutations.addAll(mutations); batchHelper.addBatch(tuple); } @@ -98,7 +92,7 @@ public class HBaseBolt extends AbstractHBaseBolt { batchHelper.ack(); batchMutations.clear(); } - } catch(Exception e){ + } catch (Exception e) { batchHelper.fail(e); batchMutations.clear(); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java index d20aab3..538d896 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java @@ -1,35 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hbase.bolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.TupleUtils; +package org.apache.storm.hbase.bolt; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Lists; - import java.util.Map; import java.util.concurrent.TimeUnit; - import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; @@ -38,6 +26,10 @@ import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,82 +37,81 @@ import org.slf4j.LoggerFactory; * Basic bolt for querying from HBase. * * Note: Each HBaseBolt defined in a topology is tied to a specific table. - * */ public class HBaseLookupBolt extends AbstractHBaseBolt { - private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupBolt.class); + private static final Logger LOG = LoggerFactory.getLogger(HBaseLookupBolt.class); - private HBaseValueMapper rowToTupleMapper; - private HBaseProjectionCriteria projectionCriteria; - private transient LoadingCache<byte[], Result> cache; - private transient boolean cacheEnabled; + private HBaseValueMapper rowToTupleMapper; + private HBaseProjectionCriteria projectionCriteria; + private transient LoadingCache<byte[], Result> cache; + private transient boolean cacheEnabled; - public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper) { - super(tableName, mapper); - Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null"); - this.rowToTupleMapper = rowToTupleMapper; - } + public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper) { + super(tableName, mapper); + Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null"); + this.rowToTupleMapper = rowToTupleMapper; + } - public HBaseLookupBolt withConfigKey(String configKey) { - this.configKey = configKey; - return this; - } + public HBaseLookupBolt withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } - public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { - this.projectionCriteria = projectionCriteria; - return this; - } + public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { + this.projectionCriteria = projectionCriteria; + return this; + } - @Override - public void prepare(Map<String, Object> config, TopologyContext topologyContext, OutputCollector collector) { - super.prepare(config, topologyContext, collector); - cacheEnabled = Boolean.parseBoolean(config.getOrDefault("hbase.cache.enable", "false").toString()); - int cacheTTL = Integer.parseInt(config.getOrDefault("hbase.cache.ttl.seconds", "300").toString()); - int maxCacheSize = Integer.parseInt(config.getOrDefault("hbase.cache.size", "1000").toString()); - if (cacheEnabled) { - cache = Caffeine.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheTTL, TimeUnit.SECONDS) - .build(new CacheLoader<byte[], Result>() { + @Override + public void prepare(Map<String, Object> config, TopologyContext topologyContext, OutputCollector collector) { + super.prepare(config, topologyContext, collector); + cacheEnabled = Boolean.parseBoolean(config.getOrDefault("hbase.cache.enable", "false").toString()); + int cacheTTL = Integer.parseInt(config.getOrDefault("hbase.cache.ttl.seconds", "300").toString()); + int maxCacheSize = Integer.parseInt(config.getOrDefault("hbase.cache.size", "1000").toString()); + if (cacheEnabled) { + cache = Caffeine.newBuilder().maximumSize(maxCacheSize).expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + .build(new CacheLoader<byte[], Result>() { - @Override - public Result load(byte[] rowKey) throws Exception { - Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); - if (LOG.isDebugEnabled()) { + @Override + public Result load(byte[] rowKey) throws Exception { + Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); + if (LOG.isDebugEnabled()) { LOG.debug("Cache miss for key:" + new String(rowKey)); - } - return hBaseClient.batchGet(Lists.newArrayList(get))[0]; - } + } + return hBaseClient.batchGet(Lists.newArrayList(get))[0]; + } - }); - } - } + }); + } + } - @Override - public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { - collector.ack(tuple); - return; - } - byte[] rowKey = this.mapper.rowKey(tuple); - Result result = null; - try { - if (cacheEnabled) { - result = cache.get(rowKey); - } else { - Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); - result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; - } - for (Values values : rowToTupleMapper.toValues(tuple, result)) { - this.collector.emit(tuple, values); - } - this.collector.ack(tuple); - } catch (Exception e) { - this.collector.reportError(e); - this.collector.fail(tuple); - } - } + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + collector.ack(tuple); + return; + } + byte[] rowKey = this.mapper.rowKey(tuple); + Result result = null; + try { + if (cacheEnabled) { + result = cache.get(rowKey); + } else { + Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria); + result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; + } + for (Values values : rowToTupleMapper.toValues(tuple, result)) { + this.collector.emit(tuple, values); + } + this.collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } + } - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - rowToTupleMapper.declareOutputFields(outputFieldsDeclarer); - } + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + rowToTupleMapper.declareOutputFields(outputFieldsDeclarer); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java index 70016c5..bf3eca6 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseMapper.java @@ -1,31 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hbase.bolt.mapper; +package org.apache.storm.hbase.bolt.mapper; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.hbase.common.ColumnList; import java.io.Serializable; +import org.apache.storm.hbase.common.ColumnList; +import org.apache.storm.tuple.Tuple; /** - * Maps a <code>org.apache.storm.tuple.Tuple</code> object - * to a row in an HBase table. + * Maps a <code>org.apache.storm.tuple.Tuple</code> object to a row in an HBase table. */ public interface HBaseMapper extends Serializable { http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java index 7325c62..18322c6 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java @@ -1,61 +1,30 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.bolt.mapper; import com.google.common.collect.Lists; - import java.io.Serializable; import java.util.Arrays; import java.util.List; /** - * Allows the user to specify the projection criteria. - * If only columnFamily is specified all columns from that family will be returned. - * If a column is specified only that column from that family will be returned. - + * Allows the user to specify the projection criteria. If only columnFamily is specified all columns from that family will be returned. If a + * column is specified only that column from that family will be returned. */ public class HBaseProjectionCriteria implements Serializable { private List<byte[]> columnFamilies; private List<ColumnMetaData> columns; - public static class ColumnMetaData implements Serializable { - private byte[] columnFamily; - private byte[] qualifier; - - public ColumnMetaData(String columnFamily, String qualifier) { - this.columnFamily = columnFamily.getBytes(); - this.qualifier = qualifier.getBytes(); - } - - public ColumnMetaData(byte[] columnFamily, byte[] qualifier) { - this.columnFamily = Arrays.copyOf(columnFamily, columnFamily.length); - this.qualifier = Arrays.copyOf(qualifier, qualifier.length); - } - - public byte[] getColumnFamily() { - return columnFamily; - } - - public byte[] getQualifier() { - return qualifier; - } - } - public HBaseProjectionCriteria() { columnFamilies = Lists.newArrayList(); columns = Lists.newArrayList(); @@ -63,6 +32,7 @@ public class HBaseProjectionCriteria implements Serializable { /** * all columns from this family will be included as result of HBase lookup. + * * @param columnFamily * @return */ @@ -73,6 +43,7 @@ public class HBaseProjectionCriteria implements Serializable { /** * all columns from this family will be included as result of HBase lookup. + * * @param columnFamily * @return */ @@ -83,6 +54,7 @@ public class HBaseProjectionCriteria implements Serializable { /** * Only this column from the the columnFamily will be included as result of HBase lookup. + * * @param column * @return */ @@ -98,4 +70,27 @@ public class HBaseProjectionCriteria implements Serializable { public List<byte[]> getColumnFamilies() { return columnFamilies; } + + public static class ColumnMetaData implements Serializable { + private byte[] columnFamily; + private byte[] qualifier; + + public ColumnMetaData(String columnFamily, String qualifier) { + this.columnFamily = columnFamily.getBytes(); + this.qualifier = qualifier.getBytes(); + } + + public ColumnMetaData(byte[] columnFamily, byte[] qualifier) { + this.columnFamily = Arrays.copyOf(columnFamily, columnFamily.length); + this.qualifier = Arrays.copyOf(qualifier, qualifier.length); + } + + public byte[] getColumnFamily() { + return columnFamily; + } + + public byte[] getQualifier() { + return qualifier; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java index 38e879f..004cf03 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java @@ -1,42 +1,37 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.bolt.mapper; +import java.io.Serializable; +import java.util.List; +import org.apache.hadoop.hbase.client.Result; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; -import org.apache.hadoop.hbase.client.Result; - -import java.io.Serializable; -import java.util.List; public interface HBaseValueMapper extends Serializable { /** - * - * @param input tuple. + * @param input tuple. * @param result HBase lookup result instance. * @return list of values that should be emitted by the lookup bolt. + * * @throws Exception */ public List<Values> toValues(ITuple input, Result result) throws Exception; /** * declares the output fields for the lookup bolt. + * * @param declarer */ void declareOutputFields(OutputFieldsDeclarer declarer); http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java index 8747405..f37995c 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/SimpleHBaseMapper.java @@ -1,69 +1,66 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.bolt.mapper; +import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -import org.apache.storm.hbase.common.ColumnList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.storm.hbase.common.Utils.*; + +import static org.apache.storm.hbase.common.Utils.toBytes; +import static org.apache.storm.hbase.common.Utils.toLong; /** * */ public class SimpleHBaseMapper implements HBaseMapper { private static final Logger LOG = LoggerFactory.getLogger(SimpleHBaseMapper.class); - + private String rowKeyField; -// private String timestampField; + // private String timestampField; private byte[] columnFamily; private Fields columnFields; private Fields counterFields; - public SimpleHBaseMapper(){ + public SimpleHBaseMapper() { } - public SimpleHBaseMapper withRowKeyField(String rowKeyField){ + public SimpleHBaseMapper withRowKeyField(String rowKeyField) { this.rowKeyField = rowKeyField; return this; } - public SimpleHBaseMapper withColumnFields(Fields columnFields){ + public SimpleHBaseMapper withColumnFields(Fields columnFields) { this.columnFields = columnFields; return this; } - public SimpleHBaseMapper withCounterFields(Fields counterFields){ + public SimpleHBaseMapper withCounterFields(Fields counterFields) { this.counterFields = counterFields; return this; } - public SimpleHBaseMapper withColumnFamily(String columnFamily){ + public SimpleHBaseMapper withColumnFamily(String columnFamily) { this.columnFamily = columnFamily.getBytes(); return this; } -// public SimpleTridentHBaseMapper withTimestampField(String timestampField){ -// this.timestampField = timestampField; -// return this; -// } + // public SimpleTridentHBaseMapper withTimestampField(String timestampField){ + // this.timestampField = timestampField; + // return this; + // } @Override public byte[] rowKey(Tuple tuple) { @@ -74,14 +71,14 @@ public class SimpleHBaseMapper implements HBaseMapper { @Override public ColumnList columns(Tuple tuple) { ColumnList cols = new ColumnList(); - if(this.columnFields != null){ + if (this.columnFields != null) { // TODO timestamps - for(String field : this.columnFields){ + for (String field : this.columnFields) { cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field))); } } - if(this.counterFields != null){ - for(String field : this.counterFields){ + if (this.counterFields != null) { + for (String field : this.counterFields) { cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field))); } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java index 0abe1ad..045b2e5 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.common; import java.util.ArrayList; @@ -28,85 +23,33 @@ import java.util.List; * Standard columns have <i>column family</i> (required), <i>qualifier</i> (optional), * <i>timestamp</i> (optional), and a <i>value</i> (optional) values. * - * Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional), - * and an <i>increment</i> (optional, but recommended) values. - * - * Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code> - * methods. + * Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional), and an <i>increment</i> (optional, but recommended) + * values. * + * Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code> methods. */ public class ColumnList { - public static abstract class AbstractColumn { - byte[] family, qualifier; - - AbstractColumn(byte[] family, byte[] qualifier){ - this.family = family; - this.qualifier = qualifier; - } - - public byte[] getFamily() { - return family; - } - - public byte[] getQualifier() { - return qualifier; - } - - } - - public static class Column extends AbstractColumn { - byte[] value; - long ts = -1; - Column(byte[] family, byte[] qualifier, long ts, byte[] value){ - super(family, qualifier); - this.value = value; - this.ts = ts; - } - - public byte[] getValue() { - return value; - } - - public long getTs() { - return ts; - } - } - - public static class Counter extends AbstractColumn { - long incr = 0; - Counter(byte[] family, byte[] qualifier, long incr){ - super(family, qualifier); - this.incr = incr; - } - - public long getIncrement() { - return incr; - } - } - - private ArrayList<Column> columns; private ArrayList<Column> columnsToDelete; private ArrayList<Counter> counters; - - private ArrayList<Column> columns(){ - if(this.columns == null){ + private ArrayList<Column> columns() { + if (this.columns == null) { this.columns = new ArrayList<Column>(); } return this.columns; } - private ArrayList<Column> columnsToDelete(){ - if(this.columnsToDelete == null){ + private ArrayList<Column> columnsToDelete() { + if (this.columnsToDelete == null) { this.columnsToDelete = new ArrayList<Column>(); } return this.columnsToDelete; } - private ArrayList<Counter> counters(){ - if(this.counters == null){ + private ArrayList<Counter> counters() { + if (this.counters == null) { this.counters = new ArrayList<Counter>(); } return this.counters; @@ -121,30 +64,31 @@ public class ColumnList { * @param value * @return */ - public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){ + public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value) { columns().add(new Column(family, qualifier, ts, value)); return this; } /** * Add a standard HBase column + * * @param family * @param qualifier * @param value * @return */ - public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){ + public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value) { columns().add(new Column(family, qualifier, -1, value)); return this; } /** - * Add a standard HBase column given an instance of a class that implements - * the <code>IColumn</code> interface. + * Add a standard HBase column given an instance of a class that implements the <code>IColumn</code> interface. + * * @param column * @return */ - public ColumnList addColumn(IColumn column){ + public ColumnList addColumn(IColumn column) { return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value()); } @@ -156,7 +100,7 @@ public class ColumnList { * @param incr * @return */ - public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){ + public ColumnList addCounter(byte[] family, byte[] qualifier, long incr) { counters().add(new Counter(family, qualifier, incr)); return this; } @@ -164,10 +108,11 @@ public class ColumnList { /** * Add an HBase counter column given an instance of a class that implements the * <code>ICounter</code> interface. + * * @param counter * @return */ - public ColumnList addCounter(ICounter counter){ + public ColumnList addCounter(ICounter counter) { return this.addCounter(counter.family(), counter.qualifier(), counter.increment()); } @@ -188,7 +133,7 @@ public class ColumnList { * * @return */ - public boolean hasColumns(){ + public boolean hasColumns() { return this.columns != null; } @@ -197,7 +142,7 @@ public class ColumnList { * * @return */ - public boolean hasColumnsToDelete(){ + public boolean hasColumnsToDelete() { return this.columnsToDelete != null; } @@ -206,7 +151,7 @@ public class ColumnList { * * @return */ - public boolean hasCounters(){ + public boolean hasCounters() { return this.counters != null; } @@ -215,7 +160,7 @@ public class ColumnList { * * @return */ - public List<Column> getColumns(){ + public List<Column> getColumns() { return this.columns; } @@ -230,10 +175,61 @@ public class ColumnList { /** * Get the list of counter definitions. + * * @return */ - public List<Counter> getCounters(){ + public List<Counter> getCounters() { return this.counters; } + public static abstract class AbstractColumn { + byte[] family, qualifier; + + AbstractColumn(byte[] family, byte[] qualifier) { + this.family = family; + this.qualifier = qualifier; + } + + public byte[] getFamily() { + return family; + } + + public byte[] getQualifier() { + return qualifier; + } + + } + + public static class Column extends AbstractColumn { + byte[] value; + long ts = -1; + + Column(byte[] family, byte[] qualifier, long ts, byte[] value) { + super(family, qualifier); + this.value = value; + this.ts = ts; + } + + public byte[] getValue() { + return value; + } + + public long getTs() { + return ts; + } + } + + public static class Counter extends AbstractColumn { + long incr = 0; + + Counter(byte[] family, byte[] qualifier, long incr) { + super(family, qualifier); + this.incr = incr; + } + + public long getIncrement() { + return incr; + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java index ad5160c..208e622 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java @@ -1,30 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hbase.common; import com.google.common.collect.Lists; - import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; @@ -51,7 +43,7 @@ public class HBaseClient implements Closeable { private HTable table; - public HBaseClient(Map<String, Object> map , final Configuration configuration, final String tableName) { + public HBaseClient(Map<String, Object> map, final Configuration configuration, final String tableName) { try { UserProvider provider = HBaseSecurityUtil.login(map, configuration); this.table = Utils.getTable(provider, configuration, tableName); @@ -69,16 +61,16 @@ public class HBaseClient implements Closeable { for (ColumnList.Column col : cols.getColumns()) { if (col.getTs() > 0) { put.add( - col.getFamily(), - col.getQualifier(), - col.getTs(), - col.getValue() + col.getFamily(), + col.getQualifier(), + col.getTs(), + col.getValue() ); } else { put.add( - col.getFamily(), - col.getQualifier(), - col.getValue() + col.getFamily(), + col.getQualifier(), + col.getValue() ); } } @@ -90,9 +82,9 @@ public class HBaseClient implements Closeable { inc.setDurability(durability); for (ColumnList.Counter cnt : cols.getCounters()) { inc.addColumn( - cnt.getFamily(), - cnt.getQualifier(), - cnt.getIncrement() + cnt.getFamily(), + cnt.getQualifier(), + cnt.getIncrement() ); } mutations.add(inc); http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java index 36f7e96..8f64c2e 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/IColumn.java @@ -1,30 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.common; /** - * Interface definition for classes that support being written to HBase as - * a regular column. - * + * Interface definition for classes that support being written to HBase as a regular column. */ public interface IColumn { byte[] family(); + byte[] qualifier(); + byte[] value(); + long timestamp(); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java index 43e3f60..ce9d84f 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ICounter.java @@ -1,29 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.common; /** - * Interface definition for classes that support being written to HBase as - * a counter column. - * + * Interface definition for classes that support being written to HBase as a counter column. */ public interface ICounter { byte[] family(); + byte[] qualifier(); + long increment(); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java index 981d4ff..0dca16e 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java @@ -1,22 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.common; +import java.io.IOException; +import java.math.BigDecimal; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.security.UserProvider; @@ -27,18 +25,14 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.math.BigDecimal; -import java.security.PrivilegedExceptionAction; - public class Utils { - private static final Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String TOKEN_KIND_HBASE_AUTH_TOKEN = "HBASE_AUTH_TOKEN"; + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); - private Utils(){} + private Utils() {} public static HTable getTable(UserProvider provider, Configuration config, String tableName) - throws IOException, InterruptedException { + throws IOException, InterruptedException { UserGroupInformation ugi; if (provider != null) { ugi = provider.getCurrent().getUGI(); @@ -51,7 +45,7 @@ public class Utils { boolean foundHBaseAuthToken = false; for (Token<? extends TokenIdentifier> token : ugi.getTokens()) { LOG.debug("Token in UGI (delegation token): {} / {}", token.toString(), - token.decodeIdentifier().getUser()); + token.decodeIdentifier().getUser()); // token.getKind() = Text, Text is annotated by @Stringable // which ensures toString() implementation @@ -66,7 +60,7 @@ public class Utils { foundHBaseAuthToken = true; } else { LOG.warn("Found multiple HBASE_AUTH_TOKEN - will use already found token. " + - "Please enable DEBUG log level to track delegation tokens."); + "Please enable DEBUG log level to track delegation tokens."); } } } @@ -78,18 +72,19 @@ public class Utils { } return ugi.doAs(new PrivilegedExceptionAction<HTable>() { - @Override public HTable run() throws IOException { + @Override + public HTable run() throws IOException { return new HTable(config, tableName); } }); } - public static long toLong(Object obj){ + public static long toLong(Object obj) { long l = 0; - if(obj != null){ - if(obj instanceof Number){ - l = ((Number)obj).longValue(); - } else{ + if (obj != null) { + if (obj instanceof Number) { + l = ((Number) obj).longValue(); + } else { LOG.warn("Could not coerce {} to Long", obj.getClass().getName()); } } @@ -97,24 +92,24 @@ public class Utils { } public static byte[] toBytes(Object obj) { - if(obj == null) { - return null; - } else if(obj instanceof String){ - return ((String)obj).getBytes(); - } else if (obj instanceof Integer){ + if (obj == null) { + return null; + } else if (obj instanceof String) { + return ((String) obj).getBytes(); + } else if (obj instanceof Integer) { return Bytes.toBytes((Integer) obj); - } else if (obj instanceof Long){ - return Bytes.toBytes((Long)obj); - } else if (obj instanceof Short){ - return Bytes.toBytes((Short)obj); - } else if (obj instanceof Float){ - return Bytes.toBytes((Float)obj); - } else if (obj instanceof Double){ - return Bytes.toBytes((Double)obj); - } else if (obj instanceof Boolean){ - return Bytes.toBytes((Boolean)obj); - } else if (obj instanceof BigDecimal){ - return Bytes.toBytes((BigDecimal)obj); + } else if (obj instanceof Long) { + return Bytes.toBytes((Long) obj); + } else if (obj instanceof Short) { + return Bytes.toBytes((Short) obj); + } else if (obj instanceof Float) { + return Bytes.toBytes((Float) obj); + } else if (obj instanceof Double) { + return Bytes.toBytes((Double) obj); + } else if (obj instanceof Boolean) { + return Bytes.toBytes((Boolean) obj); + } else if (obj instanceof BigDecimal) { + return Bytes.toBytes((BigDecimal) obj); } else { LOG.error("Can't convert class to byte array: " + obj.getClass().getName()); return new byte[0]; http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java index dcbaf65..0da456c 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java @@ -20,7 +20,6 @@ package org.apache.storm.hbase.state; import com.google.common.collect.Maps; import com.google.common.primitives.UnsignedBytes; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -31,7 +30,6 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; - import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -51,14 +49,11 @@ import org.slf4j.LoggerFactory; * A Hbase based implementation that persists the state in HBase. */ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { - private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class); - - public static byte[] STATE_QUALIFIER = "s".getBytes(); public static final int ITERATOR_CHUNK_SIZE = 1000; - public static final NavigableMap<byte[], byte[]> EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap( - new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator())); - + new TreeMap<byte[], byte[]>(UnsignedBytes.lexicographicalComparator())); + private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class); + public static byte[] STATE_QUALIFIER = "s".getBytes(); private static byte[] COMMIT_TXID_KEY = "commit".getBytes(); private static byte[] PREPARE_TXID_KEY = "prepare".getBytes(); @@ -79,22 +74,22 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { /** * Constructor. * - * @param hbaseClient HBaseClient instance + * @param hbaseClient HBaseClient instance * @param columnFamily column family to store State - * @param namespace namespace + * @param namespace namespace */ public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) { this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer<K>(), - new DefaultStateSerializer<V>()); + new DefaultStateSerializer<V>()); } /** * Constructor. * - * @param hbaseClient HBaseClient instance - * @param columnFamily column family to store State - * @param namespace namespace - * @param keySerializer key serializer + * @param hbaseClient HBaseClient instance + * @param columnFamily column family to store State + * @param namespace namespace + * @param keySerializer key serializer * @param valueSerializer value serializer */ public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace, @@ -173,7 +168,7 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { } else { HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, - STATE_QUALIFIER); + STATE_QUALIFIER); criteria.addColumn(column); Get get = hbaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria); try { @@ -210,7 +205,8 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { @Override public Iterator<Map.Entry<K, V>> iterator() { return new HBaseKeyValueStateIterator<>(namespace, columnFamily, hbaseClient, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer()); + pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), + encoder.getValueSerializer()); } @Override @@ -316,7 +312,7 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { if (committedTxid != null) { if (txid <= committedTxid) { throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid - + "' is already committed"); + + "' is already committed"); } } } @@ -375,7 +371,7 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { mutations.add(new Delete(getRowKeyForStateKey(rowKey))); } else { List<Mutation> mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily, - Collections.singletonMap(STATE_QUALIFIER, value)); + Collections.singletonMap(STATE_QUALIFIER, value)); mutations.addAll(mutationsForRow); } } @@ -402,7 +398,7 @@ public class HBaseKeyValueState<K, V> implements KeyValueState<K, V> { } private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map) - throws Exception { + throws Exception { mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java index 9b69aec..dbbd6d0 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java @@ -1,67 +1,57 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hbase.state; -import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; - import com.google.common.primitives.UnsignedBytes; - import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; - import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.storm.hbase.common.HBaseClient; import org.apache.storm.state.BaseBinaryStateIterator; import org.apache.storm.state.DefaultStateEncoder; import org.apache.storm.state.Serializer; - import org.apache.storm.state.StateEncoder; +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; + /** * An iterator over {@link HBaseKeyValueState}. */ public class HBaseKeyValueStateIterator<K, V> extends BaseBinaryStateIterator<K, V> { private final byte[] keyNamespace; - private byte[] cursorKey; private final byte[] endScanKey; private final byte[] columnFamily; private final HBaseClient hbaseClient; private final int chunkSize; private final StateEncoder<K, V, byte[], byte[]> encoder; - + private byte[] cursorKey; private Iterator<Map.Entry<byte[], byte[]>> cachedResultIterator; /** * Constructor. * - * @param namespace The namespace of State - * @param columnFamily The column family of state - * @param hbaseClient The instance of HBaseClient + * @param namespace The namespace of State + * @param columnFamily The column family of state + * @param hbaseClient The instance of HBaseClient * @param pendingPrepareIterator The iterator of pendingPrepare - * @param pendingCommitIterator The iterator of pendingCommit - * @param chunkSize The size of chunk to get entries from HBase - * @param keySerializer The serializer of key - * @param valueSerializer The serializer of value + * @param pendingCommitIterator The iterator of pendingCommit + * @param chunkSize The size of chunk to get entries from HBase + * @param keySerializer The serializer of key + * @param valueSerializer The serializer of value */ public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, HBaseClient hbaseClient, Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator, http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java index 763c387..89adae3 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java @@ -21,6 +21,9 @@ package org.apache.storm.hbase.state; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.procedure2.util.StringUtils; @@ -34,10 +37,6 @@ import org.apache.storm.task.TopologyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** * Provides {@link HBaseKeyValueState}. */ @@ -86,12 +85,12 @@ public class HBaseKeyValueStateProvider implements StateProvider { HBaseClient hbaseClient = new HBaseClient(hbaseConfMap, hbConfig, config.tableName); return new HBaseKeyValueState(hbaseClient, config.columnFamily, namespace, - getKeySerializer(stormConf, context, config), getValueSerializer(stormConf, context, config)); + getKeySerializer(stormConf, context, config), getValueSerializer(stormConf, context, config)); } private Configuration getHBaseConfigurationInstance(Map<String, Object> conf) { final Configuration hbConfig = HBaseConfiguration.create(); - for(String key : conf.keySet()) { + for (String key : conf.keySet()) { hbConfig.set(key, String.valueOf(conf.get(key))); } return hbConfig; @@ -99,11 +98,11 @@ public class HBaseKeyValueStateProvider implements StateProvider { private Map<String, Object> getHBaseConfigMap(Map<String, Object> stormConfMap, String hbaseConfigKey) { Map<String, Object> conf = (Map<String, Object>) stormConfMap.get(hbaseConfigKey); - if(conf == null) { + if (conf == null) { throw new IllegalArgumentException("HBase configuration not found using key '" + hbaseConfigKey + "'"); } - if(conf.get("hbase.rootdir") == null) { + if (conf.get("hbase.rootdir") == null) { LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults."); } return conf; @@ -153,14 +152,14 @@ public class HBaseKeyValueStateProvider implements StateProvider { @Override public String toString() { return "StateConfig{" + - "keyClass='" + keyClass + '\'' + - ", valueClass='" + valueClass + '\'' + - ", keySerializerClass='" + keySerializerClass + '\'' + - ", valueSerializerClass='" + valueSerializerClass + '\'' + - ", hbaseConfigKey='" + hbaseConfigKey + '\'' + - ", tableName='" + tableName + '\'' + - ", columnFamily='" + columnFamily + '\'' + - '}'; + "keyClass='" + keyClass + '\'' + + ", valueClass='" + valueClass + '\'' + + ", keySerializerClass='" + keySerializerClass + '\'' + + ", valueSerializerClass='" + valueSerializerClass + '\'' + + ", hbaseConfigKey='" + hbaseConfigKey + '\'' + + ", tableName='" + tableName + '\'' + + ", columnFamily='" + columnFamily + '\'' + + '}'; } } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java index 2262a79..86f38d7 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapMapper.java @@ -37,7 +37,7 @@ public class SimpleTridentHBaseMapMapper implements TridentHBaseMapMapper { bos.write(String.valueOf(key).getBytes()); } bos.close(); - } catch (IOException e){ + } catch (IOException e) { throw new RuntimeException("IOException creating HBase row key.", e); } return bos.toByteArray(); http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java index eda9a32..f7ec2c7 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/SimpleTridentHBaseMapper.java @@ -1,29 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.mapper; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.ColumnList; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.trident.tuple.TridentTuple; import static org.apache.storm.hbase.common.Utils.toBytes; import static org.apache.storm.hbase.common.Utils.toLong; @@ -33,32 +26,32 @@ import static org.apache.storm.hbase.common.Utils.toLong; */ public class SimpleTridentHBaseMapper implements TridentHBaseMapper { private static final Logger LOG = LoggerFactory.getLogger(SimpleTridentHBaseMapper.class); - + private String rowKeyField; private byte[] columnFamily; private Fields columnFields; private Fields counterFields; - public SimpleTridentHBaseMapper(){ + public SimpleTridentHBaseMapper() { } - public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField){ + public SimpleTridentHBaseMapper withRowKeyField(String rowKeyField) { this.rowKeyField = rowKeyField; return this; } - public SimpleTridentHBaseMapper withColumnFields(Fields columnFields){ + public SimpleTridentHBaseMapper withColumnFields(Fields columnFields) { this.columnFields = columnFields; return this; } - public SimpleTridentHBaseMapper withCounterFields(Fields counterFields){ + public SimpleTridentHBaseMapper withCounterFields(Fields counterFields) { this.counterFields = counterFields; return this; } - public SimpleTridentHBaseMapper withColumnFamily(String columnFamily){ + public SimpleTridentHBaseMapper withColumnFamily(String columnFamily) { this.columnFamily = columnFamily.getBytes(); return this; } @@ -73,14 +66,14 @@ public class SimpleTridentHBaseMapper implements TridentHBaseMapper { @Override public ColumnList columns(TridentTuple tuple) { ColumnList cols = new ColumnList(); - if(this.columnFields != null){ + if (this.columnFields != null) { // TODO timestamps - for(String field : this.columnFields){ + for (String field : this.columnFields) { cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field))); } } - if(this.counterFields != null){ - for(String field : this.counterFields){ + if (this.counterFields != null) { + for (String field : this.counterFields) { cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field))); } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java index bb95497..dfdc2b2 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/mapper/TridentHBaseMapper.java @@ -19,31 +19,29 @@ package org.apache.storm.hbase.trident.mapper; -import org.apache.storm.tuple.Tuple; +import java.io.Serializable; import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.trident.tuple.TridentTuple; -import java.io.Serializable; /** - * Maps a <code>org.apache.storm.trident.tuple.TridentTuple</code> object - * to a row in an HBase table. + * Maps a <code>org.apache.storm.trident.tuple.TridentTuple</code> object to a row in an HBase table. */ public interface TridentHBaseMapper extends Serializable { - /** - * Given a tuple, return the HBase rowkey. - * - * @param tuple - * @return - */ - byte[] rowKey(TridentTuple tuple); + /** + * Given a tuple, return the HBase rowkey. + * + * @param tuple + * @return + */ + byte[] rowKey(TridentTuple tuple); - /** - * Given a tuple, return a list of HBase columns to insert. - * - * @param tuple - * @return - */ - ColumnList columns(TridentTuple tuple); + /** + * Given a tuple, return a list of HBase columns to insert. + * + * @param tuple + * @return + */ + ColumnList columns(TridentTuple tuple); }
