http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java index 9dbcd80..d2a73d4 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java @@ -19,14 +19,12 @@ package org.apache.storm.hbase.trident.state; import com.google.common.collect.Maps; - import java.io.IOException; import java.io.InterruptedIOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; @@ -61,13 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HBaseMapState<T> implements IBackingMap<T> { - private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class); - - private int partitionNum; - - @SuppressWarnings("rawtypes") private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap(); + private static Logger LOG = LoggerFactory.getLogger(HBaseMapState.class); static { DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer()); @@ -75,6 +69,7 @@ public class HBaseMapState<T> implements IBackingMap<T> { DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer()); } + private int partitionNum; private Options<T> options; private Serializer<T> serializer; private HTable table; @@ -82,8 +77,8 @@ public class HBaseMapState<T> implements IBackingMap<T> { /** * Constructor. * - * @param options HBase State options. - * @param map topology config map. + * @param options HBase State options. + * @param map topology config map. * @param partitionNum the number of partition. */ public HBaseMapState(final Options<T> options, Map<String, Object> map, int partitionNum) { @@ -92,7 +87,7 @@ public class HBaseMapState<T> implements IBackingMap<T> { this.partitionNum = partitionNum; final Configuration hbConfig = HBaseConfiguration.create(); - Map<String, Object> conf = (Map<String, Object>)map.get(options.configKey); + Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey); if (conf == null) { LOG.info("HBase configuration not found using key '" + options.configKey + "'"); LOG.info("Using HBase config from first hbase-site.xml found on classpath."); @@ -114,19 +109,6 @@ public class HBaseMapState<T> implements IBackingMap<T> { } - - public static class Options<T> implements Serializable { - - public Serializer<T> serializer = null; - public int cacheSize = 5000; - public String globalKey = "$HBASE_STATE_GLOBAL$"; - public String configKey = "hbase.config"; - public String tableName; - public String columnFamily; - public TridentHBaseMapMapper mapMapper; - } - - @SuppressWarnings("rawtypes") public static StateFactory opaque() { Options<OpaqueValue> options = new Options<OpaqueValue>(); @@ -159,57 +141,6 @@ public class HBaseMapState<T> implements IBackingMap<T> { return new Factory(StateType.NON_TRANSACTIONAL, opts); } - - protected static class Factory implements StateFactory { - private StateType stateType; - private Options options; - - @SuppressWarnings({"rawtypes", "unchecked"}) - public Factory(StateType stateType, Options options) { - this.stateType = stateType; - this.options = options; - - if (this.options.serializer == null) { - this.options.serializer = DEFAULT_SERIALZERS.get(stateType); - } - - if (this.options.serializer == null) { - throw new RuntimeException("Serializer should be specified for type: " + stateType); - } - - if (this.options.mapMapper == null) { - throw new RuntimeException("MapMapper should be specified for type: " + stateType); - } - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions); - IBackingMap state = new HBaseMapState(options, conf, partitionIndex); - - if (options.cacheSize > 0) { - state = new CachedMap(state, options.cacheSize); - } - - MapState mapState; - switch (stateType) { - case NON_TRANSACTIONAL: - mapState = NonTransactionalMap.build(state); - break; - case OPAQUE: - mapState = OpaqueMap.build(state); - break; - case TRANSACTIONAL: - mapState = TransactionalMap.build(state); - break; - default: - throw new IllegalArgumentException("Unknown state type: " + stateType); - } - return new SnapshottableMap(mapState, new Values(options.globalKey)); - } - - } - @Override public List<T> multiGet(List<List<Object>> keys) { List<Get> gets = new ArrayList<Get>(); @@ -249,7 +180,7 @@ public class HBaseMapState<T> implements IBackingMap<T> { byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i)); String qualifier = this.options.mapMapper.qualifier(keys.get(i)); LOG.info("Partiton: {}, Key: {}, Value: {}", - new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))}); + new Object[]{ this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i))) }); Put put = new Put(hbaseKey); T val = values.get(i); put.add(this.options.columnFamily.getBytes(), @@ -268,4 +199,65 @@ public class HBaseMapState<T> implements IBackingMap<T> { throw new FailedException("IOException while writing to HBase", e); } } + + public static class Options<T> implements Serializable { + + public Serializer<T> serializer = null; + public int cacheSize = 5000; + public String globalKey = "$HBASE_STATE_GLOBAL$"; + public String configKey = "hbase.config"; + public String tableName; + public String columnFamily; + public TridentHBaseMapMapper mapMapper; + } + + protected static class Factory implements StateFactory { + private StateType stateType; + private Options options; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public Factory(StateType stateType, Options options) { + this.stateType = stateType; + this.options = options; + + if (this.options.serializer == null) { + this.options.serializer = DEFAULT_SERIALZERS.get(stateType); + } + + if (this.options.serializer == null) { + throw new RuntimeException("Serializer should be specified for type: " + stateType); + } + + if (this.options.mapMapper == null) { + throw new RuntimeException("MapMapper should be specified for type: " + stateType); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + LOG.info("Preparing HBase State for partition {} of {}.", partitionIndex + 1, numPartitions); + IBackingMap state = new HBaseMapState(options, conf, partitionIndex); + + if (options.cacheSize > 0) { + state = new CachedMap(state, options.cacheSize); + } + + MapState mapState; + switch (stateType) { + case NON_TRANSACTIONAL: + mapState = NonTransactionalMap.build(state); + break; + case OPAQUE: + mapState = OpaqueMap.build(state); + break; + case TRANSACTIONAL: + mapState = TransactionalMap.build(state); + break; + default: + throw new IllegalArgumentException("Unknown state type: " + stateType); + } + return new SnapshottableMap(mapState, new Values(options.globalKey)); + } + + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java index 4d95eb5..e3c8755 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseQuery.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.state; -import org.apache.storm.tuple.Values; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseQueryFunction; import org.apache.storm.trident.tuple.TridentTuple; - -import java.util.List; +import org.apache.storm.tuple.Values; public class HBaseQuery extends BaseQueryFunction<HBaseState, List<Values>> { http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java index 7b32eae..a216402 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java @@ -1,44 +1,41 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.state; -import org.apache.storm.Config; -import org.apache.storm.topology.FailedException; -import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.storm.Config; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.hbase.common.HBaseClient; import org.apache.storm.hbase.trident.mapper.TridentHBaseMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.storm.topology.FailedException; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HBaseState implements State { @@ -57,49 +54,10 @@ public class HBaseState implements State { this.numPartitions = numPartitions; } - public static class Options implements Serializable { - private TridentHBaseMapper mapper; - private Durability durability = Durability.SKIP_WAL; - private HBaseProjectionCriteria projectionCriteria; - private HBaseValueMapper rowToStormValueMapper; - private String configKey; - private String tableName; - - public Options withDurability(Durability durability) { - this.durability = durability; - return this; - } - - public Options withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { - this.projectionCriteria = projectionCriteria; - return this; - } - - public Options withConfigKey(String configKey) { - this.configKey = configKey; - return this; - } - - public Options withTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public Options withRowToStormValueMapper(HBaseValueMapper rowToStormValueMapper) { - this.rowToStormValueMapper = rowToStormValueMapper; - return this; - } - - public Options withMapper(TridentHBaseMapper mapper) { - this.mapper = mapper; - return this; - } - } - protected void prepare() { final Configuration hbConfig = HBaseConfiguration.create(); Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey); - if(conf == null){ + if (conf == null) { LOG.info("HBase configuration not found using key '" + options.configKey + "'"); LOG.info("Using HBase config from first hbase-site.xml found on classpath."); } else { @@ -156,7 +114,7 @@ public class HBaseState implements State { try { Result[] results = hBaseClient.batchGet(gets); - for(int i = 0; i < results.length; i++) { + for (int i = 0; i < results.length; i++) { Result result = results[i]; TridentTuple tuple = tridentTuples.get(i); List<Values> values = options.rowToStormValueMapper.toValues(tuple, result); @@ -168,4 +126,43 @@ public class HBaseState implements State { } return batchRetrieveResult; } + + public static class Options implements Serializable { + private TridentHBaseMapper mapper; + private Durability durability = Durability.SKIP_WAL; + private HBaseProjectionCriteria projectionCriteria; + private HBaseValueMapper rowToStormValueMapper; + private String configKey; + private String tableName; + + public Options withDurability(Durability durability) { + this.durability = durability; + return this; + } + + public Options withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) { + this.projectionCriteria = projectionCriteria; + return this; + } + + public Options withConfigKey(String configKey) { + this.configKey = configKey; + return this; + } + + public Options withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Options withRowToStormValueMapper(HBaseValueMapper rowToStormValueMapper) { + this.rowToStormValueMapper = rowToStormValueMapper; + return this; + } + + public Options withMapper(TridentHBaseMapper mapper) { + this.mapper = mapper; + return this; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java index 08e9dc3..99b87b5 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseStateFactory.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.state; +import java.util.Map; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; -import java.util.Map; - public class HBaseStateFactory implements StateFactory { private HBaseState.Options options; @@ -33,7 +27,7 @@ public class HBaseStateFactory implements StateFactory { @Override public State makeState(Map<String, Object> map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { - HBaseState state = new HBaseState(map , partitionIndex, numPartitions, options); + HBaseState state = new HBaseState(map, partitionIndex, numPartitions, options); state.prepare(); return state; } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java index 57ca844..6340abc 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseUpdater.java @@ -1,29 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.state; +import java.util.List; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import java.util.List; - -public class HBaseUpdater extends BaseStateUpdater<HBaseState> { +public class HBaseUpdater extends BaseStateUpdater<HBaseState> { @Override public void updateState(HBaseState hBaseState, List<TridentTuple> tuples, TridentCollector collector) { http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index 702a790..086b477 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -1,23 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.windowing; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -31,32 +35,20 @@ import org.apache.storm.trident.windowing.WindowsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - /** * This class stores entries into hbase instance of the given configuration. - * */ public class HBaseWindowsStore implements WindowsStore { - private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); public static final String UTF_8 = "utf-8"; - + private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); private final ThreadLocal<HTable> threadLocalHtable; private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer; private final Queue<HTable> htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; - public HBaseWindowsStore(final Map<String, Object> topoConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) { + public HBaseWindowsStore(final Map<String, Object> topoConf, final Configuration config, final String tableName, byte[] family, + byte[] qualifier) { this.family = family; this.qualifier = qualifier; @@ -73,7 +65,7 @@ public class HBaseWindowsStore implements WindowsStore { } }; - threadLocalWindowKryoSerializer = new ThreadLocal<WindowKryoSerializer>(){ + threadLocalWindowKryoSerializer = new ThreadLocal<WindowKryoSerializer>() { @Override protected WindowKryoSerializer initialValue() { return new WindowKryoSerializer(topoConf); @@ -111,7 +103,7 @@ public class HBaseWindowsStore implements WindowsStore { throw new RuntimeException(e); } - if(result.isEmpty()) { + if (result.isEmpty()) { return null; } @@ -136,11 +128,11 @@ public class HBaseWindowsStore implements WindowsStore { } List<Object> values = new ArrayList<>(); - for (int i=0; i<results.length; i++) { + for (int i = 0; i < results.length; i++) { Result result = results[i]; - if(result.isEmpty()) { + if (result.isEmpty()) { LOG.error("Got empty result for key [{}]", keys.get(i)); - throw new RuntimeException("Received empty result for key: "+keys.get(i)); + throw new RuntimeException("Received empty result for key: " + keys.get(i)); } Object resultObject = windowKryoSerializer().deserialize(result.getValue(family, qualifier)); values.add(resultObject); @@ -200,8 +192,8 @@ public class HBaseWindowsStore implements WindowsStore { WindowsStore.Entry.nonNullCheckForKey(key); WindowsStore.Entry.nonNullCheckForValue(value); - if(value == null) { - throw new IllegalArgumentException("Invalid value of null with key: "+key); + if (value == null) { + throw new IllegalArgumentException("Invalid value of null with key: " + key); } Put put = new Put(effectiveKey(key)); put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(value)); @@ -217,7 +209,8 @@ public class HBaseWindowsStore implements WindowsStore { List<Put> list = new ArrayList<>(); for (Entry entry : entries) { Put put = new Put(effectiveKey(entry.key)); - put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), windowKryoSerializer().serializeToByteBuffer(entry.value)); + put.addColumn(family, ByteBuffer.wrap(qualifier), System.currentTimeMillis(), + windowKryoSerializer().serializeToByteBuffer(entry.value)); list.add(put); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java index f0c8805..9981811 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java @@ -1,33 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hbase.trident.windowing; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.storm.trident.windowing.WindowsStore; import org.apache.storm.trident.windowing.WindowsStoreFactory; -import java.util.Map; - /** * Factory to create {@link HBaseWindowsStore} instances. - * */ public class HBaseWindowsStoreFactory implements WindowsStoreFactory { private final Map<String, Object> config; http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java index c4fa7c1..24ac8c5 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java @@ -18,11 +18,7 @@ package org.apache.storm.hbase.state; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; - import com.google.common.primitives.UnsignedBytes; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -34,7 +30,6 @@ import java.util.NavigableSet; import java.util.TreeMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; @@ -52,27 +47,30 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; + public class HBaseClientTestUtil { private HBaseClientTestUtil() { } public static HBaseClient mockedHBaseClient() throws Exception { return mockedHBaseClient(new ConcurrentSkipListMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>>( - UnsignedBytes.lexicographicalComparator())); + UnsignedBytes.lexicographicalComparator())); } public static HBaseClient mockedHBaseClient( - ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> internalMap) - throws Exception { + ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> internalMap) + throws Exception { HBaseClient mockClient = mock(HBaseClient.class); Mockito.doNothing().when(mockClient).close(); Mockito.when(mockClient.constructGetRequests(any(byte[].class), any(HBaseProjectionCriteria.class))) - .thenCallRealMethod(); + .thenCallRealMethod(); Mockito.when(mockClient.constructMutationReq(any(byte[].class), any(ColumnList.class), any(Durability.class))) - .thenCallRealMethod(); + .thenCallRealMethod(); Mockito.when(mockClient.exists(any(Get.class))).thenAnswer(new ExistsAnswer(internalMap)); Mockito.when(mockClient.batchGet(any(List.class))).thenAnswer(new BatchGetAnswer(internalMap)); @@ -84,8 +82,8 @@ public class HBaseClientTestUtil { static class BuildCellsHelper { public static void addMatchingColumnFamilies(byte[] rowKey, Map<byte[], NavigableSet<byte[]>> familyMap, - NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap, - List<Cell> cells) { + NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap, + List<Cell> cells) { for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { byte[] columnFamily = entry.getKey(); @@ -101,9 +99,9 @@ public class HBaseClientTestUtil { } public static void addMatchingQualifiers(byte[] rowKey, byte[] columnFamily, - Map.Entry<byte[], NavigableSet<byte[]>> qualifierSet, - NavigableMap<byte[], byte[]> qualifierToValueMap, - List<Cell> cells) { + Map.Entry<byte[], NavigableSet<byte[]>> qualifierSet, + NavigableMap<byte[], byte[]> qualifierToValueMap, + List<Cell> cells) { for (byte[] qualifier : qualifierSet.getValue()) { byte[] value = qualifierToValueMap.get(qualifier); if (value != null) { @@ -113,7 +111,7 @@ public class HBaseClientTestUtil { } public static void addAllColumnFamilies(byte[] rowKey, NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap, - List<Cell> cells) { + List<Cell> cells) { for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> entry : cfToQualifierToValueMap.entrySet()) { byte[] columnFamily = entry.getKey(); addAllQualifiers(rowKey, columnFamily, entry.getValue(), cells); @@ -121,7 +119,7 @@ public class HBaseClientTestUtil { } public static void addAllQualifiers(byte[] rowKey, byte[] columnFamily, - NavigableMap<byte[], byte[]> qualifierToValueMap, List<Cell> cells) { + NavigableMap<byte[], byte[]> qualifierToValueMap, List<Cell> cells) { for (Map.Entry<byte[], byte[]> entry2 : qualifierToValueMap.entrySet()) { byte[] qualifier = entry2.getKey(); byte[] value = entry2.getValue(); @@ -149,7 +147,7 @@ public class HBaseClientTestUtil { byte[] rowKey = get.getRow(); NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfToQualifierToValueMap = - mockMap.get(rowKey); + mockMap.get(rowKey); if (cfToQualifierToValueMap != null) { Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap(); @@ -310,7 +308,7 @@ public class HBaseClientTestUtil { byte[] endKey = (byte[]) args[1]; final ConcurrentNavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], byte[]>>> subMap = - mockMap.subMap(startKey, true, endKey, false); + mockMap.subMap(startKey, true, endKey, false); final List<Result> results = buildResults(subMap); @@ -353,7 +351,7 @@ public class HBaseClientTestUtil { @Override public Result[] next(int nbRows) throws IOException { List<Result> bulkResult = new ArrayList<>(); - for (int i = 0 ; i < nbRows ; i++) { + for (int i = 0; i < nbRows; i++) { Result result = next(); if (result == null) { break; http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java index aa965d1..250e337 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java @@ -16,13 +16,7 @@ package org.apache.storm.hbase.state; -import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import com.google.common.primitives.UnsignedBytes; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; - import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -42,6 +35,11 @@ import org.apache.storm.state.Serializer; import org.junit.Before; import org.junit.Test; +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Test for HBaseKeyValueStateIterator. */ @@ -81,8 +79,8 @@ public class HBaseKeyValueStateIteratorTest { applyPendingStateToHBase(chunkMap); HBaseKeyValueStateIterator<byte[], byte[]> kvIterator = - new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes()); @@ -111,8 +109,8 @@ public class HBaseKeyValueStateIteratorTest { applyPendingStateToHBase(chunkMap); HBaseKeyValueStateIterator<byte[], byte[]> kvIterator = - new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); // keys shouldn't appear twice @@ -134,8 +132,8 @@ public class HBaseKeyValueStateIteratorTest { NavigableMap<byte[], byte[]> pendingCommit = getBinaryTreeMap(); HBaseKeyValueStateIterator<byte[], byte[]> kvIterator = - new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), - pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); assertFalse(kvIterator.hasNext()); } @@ -170,7 +168,7 @@ public class HBaseKeyValueStateIteratorTest { mutations.add(new Delete(getRowKeyForStateKey(rowKey))); } else { List<Mutation> mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily, - Collections.singletonMap(STATE_QUALIFIER, value)); + Collections.singletonMap(STATE_QUALIFIER, value)); mutations.addAll(mutationsForRow); } } @@ -204,7 +202,7 @@ public class HBaseKeyValueStateIteratorTest { } private void mutateRow(byte[] rowKey, byte[] columnFamily, Map<byte[], byte[]> map) - throws Exception { + throws Exception { mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); } http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java index bd38e57..b7c562f 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java @@ -18,11 +18,10 @@ package org.apache.storm.hbase.state; -import org.apache.storm.Config; -import org.junit.Test; - import java.util.HashMap; import java.util.Map; +import org.apache.storm.Config; +import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -38,7 +37,7 @@ public class HBaseKeyValueStateProviderTest { HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); Map<String, String> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + - " \"tableName\": \"table\", \"columnFamily\": \"cf\"}"); + " \"tableName\": \"table\", \"columnFamily\": \"cf\"}"); try { HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); @@ -53,7 +52,7 @@ public class HBaseKeyValueStateProviderTest { HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); Map<String, String> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + - " \"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": \"cf\"}"); + " \"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": \"cf\"}"); try { HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); @@ -68,7 +67,7 @@ public class HBaseKeyValueStateProviderTest { HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); Map<String, String> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + - " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"}"); + " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"}"); try { HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); @@ -83,8 +82,8 @@ public class HBaseKeyValueStateProviderTest { HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); Map<String, String> stormConf = new HashMap<>(); stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + - " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"," + - " \"columnFamily\": \"columnFamily\"}"); + " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"," + + " \"columnFamily\": \"columnFamily\"}"); HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); assertEquals("String", config.keyClass); http://git-wip-us.apache.org/repos/asf/storm/blob/880d14f1/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java index 7827283..bf40409 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java @@ -19,17 +19,14 @@ package org.apache.storm.hbase.state; import com.google.common.primitives.UnsignedBytes; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.storm.hbase.common.HBaseClient; import org.apache.storm.state.DefaultStateSerializer; import org.junit.Before; import org.junit.Test; -import java.util.Map; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -49,7 +46,7 @@ public class HBaseKeyValueStateTest { mockMap = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); mockClient = HBaseClientTestUtil.mockedHBaseClient(mockMap); keyValueState = new HBaseKeyValueState<>(mockClient, COLUMN_FAMILY, NAMESPACE, - new DefaultStateSerializer<String>(), new DefaultStateSerializer<String>()); + new DefaultStateSerializer<String>(), new DefaultStateSerializer<String>()); } @Test @@ -80,38 +77,38 @@ public class HBaseKeyValueStateTest { keyValueState.put("b", "2"); keyValueState.prepareCommit(1); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{null, null, null}, getValues()); + assertArrayEquals(new String[]{ null, null, null }, getValues()); keyValueState.put("a", "1"); keyValueState.put("b", "2"); keyValueState.prepareCommit(1); keyValueState.commit(1); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{"1", "2", null}, getValues()); + assertArrayEquals(new String[]{ "1", "2", null }, getValues()); keyValueState.put("c", "3"); assertEquals("2", keyValueState.delete("b")); assertEquals("3", keyValueState.delete("c")); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.prepareCommit(2); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.commit(2); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); keyValueState.put("b", "2"); keyValueState.prepareCommit(3); keyValueState.put("c", "3"); - assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + assertArrayEquals(new String[]{ "1", "2", "3" }, getValues()); keyValueState.rollback(); - assertArrayEquals(new String[]{"1", null, null}, getValues()); + assertArrayEquals(new String[]{ "1", null, null }, getValues()); } private String[] getValues() { return new String[]{ - keyValueState.get("a"), - keyValueState.get("b"), - keyValueState.get("c") + keyValueState.get("a"), + keyValueState.get("b"), + keyValueState.get("c") }; } } \ No newline at end of file
