[
https://issues.apache.org/jira/browse/STORM-609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264068#comment-14264068
]
ASF GitHub Bot commented on STORM-609:
--------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/365#discussion_r22443267
--- Diff:
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
---
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.tuple.Values;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
+import storm.trident.state.JSONNonTransactionalSerializer;
+import storm.trident.state.JSONOpaqueSerializer;
+import storm.trident.state.JSONTransactionalSerializer;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.IBackingMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+public class RedisMapState<T> implements IBackingMap<T> {
+ private static final Logger logger =
LoggerFactory.getLogger(RedisMapState.class);
+
+ private static final EnumMap<StateType, Serializer>
DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+ StateType.NON_TRANSACTIONAL, new
JSONNonTransactionalSerializer(),
+ StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+ StateType.OPAQUE, new JSONOpaqueSerializer()
+ ));
+
+ public static class DefaultKeyFactory implements KeyFactory {
+ public String build(List<Object> key) {
+ if (key.size() != 1)
+ throw new RuntimeException("Default KeyFactory does not
support compound keys");
+ return (String) key.get(0);
+ }
+ };
+
+ public static class Options<T> implements Serializable {
+ public int localCacheSize = 1000;
+ public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+ KeyFactory keyFactory = null;
+ public Serializer<T> serializer = null;
+ public String hkey = null;
+ }
+
+ public static interface KeyFactory extends Serializable {
+ String build(List<Object> key);
+ }
+
+ /**
+ * OpaqueTransactional for redis.
+ * */
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig) {
+ return opaque(jedisPoolConfig, new Options());
+ }
+
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig,
String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return opaque(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig,
KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return opaque(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig,
Options<OpaqueValue> opts) {
+ return new Factory(jedisPoolConfig, StateType.OPAQUE, opts);
+ }
+
+ /**
+ * Transactional for redis.
+ * */
+ public static StateFactory transactional(JedisPoolConfig
jedisPoolConfig) {
+ return transactional(jedisPoolConfig, new Options());
+ }
+
+ public static StateFactory transactional(JedisPoolConfig
jedisPoolConfig, String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return transactional(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory transactional(JedisPoolConfig
jedisPoolConfig, KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return transactional(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory transactional(JedisPoolConfig
jedisPoolConfig, Options<TransactionalValue> opts) {
+ return new Factory(jedisPoolConfig, StateType.TRANSACTIONAL, opts);
+ }
+
+ /**
+ * NonTransactional for redis.
+ * */
+ public static StateFactory nonTransactional(JedisPoolConfig
jedisPoolConfig) {
+ return nonTransactional(jedisPoolConfig, new Options());
+ }
+
+ public static StateFactory nonTransactional(JedisPoolConfig
jedisPoolConfig, String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return nonTransactional(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory nonTransactional(JedisPoolConfig
jedisPoolConfig, KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return nonTransactional(jedisPoolConfig, opts);
+ }
+
+ public static StateFactory nonTransactional(JedisPoolConfig
jedisPoolConfig, Options<Object> opts) {
+ return new Factory(jedisPoolConfig, StateType.NON_TRANSACTIONAL,
opts);
+ }
+
+ protected static class Factory implements StateFactory {
+ // TODO : serialize redis.clients.jedis.JedisPoolConfig
+ public static final redis.clients.jedis.JedisPoolConfig
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+ JedisPoolConfig jedisPoolConfig;
+
+ StateType type;
+ Serializer serializer;
+ KeyFactory keyFactory;
+ Options options;
+
+ public Factory(JedisPoolConfig jedisPoolConfig, StateType type,
Options options) {
+ this.jedisPoolConfig = jedisPoolConfig;
+ this.type = type;
+ this.options = options;
+
+ this.keyFactory = options.keyFactory;
+ if (this.keyFactory == null) {
+ this.keyFactory = new DefaultKeyFactory();
+ }
+ this.serializer = options.serializer;
+ if (this.serializer == null) {
+ this.serializer = DEFAULT_SERIALIZERS.get(type);
+ if (this.serializer == null) {
+ throw new RuntimeException("Couldn't find serializer
for state type: " + type);
+ }
+ }
+ }
+
+ public State makeState(@SuppressWarnings("rawtypes") Map conf,
IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+
jedisPoolConfig.getHost(),
+
jedisPoolConfig.getPort(),
+
jedisPoolConfig.getTimeout(),
+
jedisPoolConfig.getPassword(),
+
jedisPoolConfig.getDatabase());
+ RedisMapState state = new RedisMapState(jedisPool, options,
serializer, keyFactory);
+ CachedMap c = new CachedMap(state, options.localCacheSize);
+
+ MapState ms;
+ if (type == StateType.NON_TRANSACTIONAL) {
+ ms = NonTransactionalMap.build(c);
+
+ } else if (type == StateType.OPAQUE) {
+ ms = OpaqueMap.build(c);
+
+ } else if (type == StateType.TRANSACTIONAL) {
+ ms = TransactionalMap.build(c);
+
+ } else {
+ throw new RuntimeException("Unknown state type: " + type);
+ }
+
+ return new SnapshottableMap(ms, new Values(options.globalKey));
+ }
+ }
+
+ private JedisPool jedisPool;
+ private Options options;
+ private Serializer serializer;
+ private KeyFactory keyFactory;
+
+ public RedisMapState(JedisPool jedisPool, Options options,
+ Serializer<T> serializer,
KeyFactory keyFactory) {
+ this.jedisPool = jedisPool;
+ this.options = options;
+ this.serializer = serializer;
+ this.keyFactory = keyFactory;
+ }
+
+ public List<T> multiGet(List<List<Object>> keys) {
+ if (keys.size() == 0) {
+ return Collections.emptyList();
+ }
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ String[] stringKeys = buildKeys(keys);
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+ List<String> values = jedis.mget(stringKeys);
+ return deserializeValues(keys, values);
+ } finally {
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
+ }
+ }
+ } else {
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+ Map<String, String> keyValue =
jedis.hgetAll(this.options.hkey);
+ List<String> values = buildValuesFromMap(keys, keyValue);
+ return deserializeValues(keys, values);
+ } finally {
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
+ }
+ }
+ }
+ }
+
+ private List<String> buildValuesFromMap(List<List<Object>> keys,
Map<String, String> keyValue) {
+ List<String> values = new ArrayList<String>(keys.size());
+ for (List<Object> key : keys) {
+ String strKey = keyFactory.build(key);
+ String value = keyValue.get(strKey);
+ values.add(value);
+ }
+ return values;
+ }
+
+ private List<T> deserializeValues(List<List<Object>> keys,
List<String> values) {
+ List<T> result = new ArrayList<T>(keys.size());
+ for (String value : values) {
+ if (value != null) {
+ result.add((T) serializer.deserialize(value.getBytes()));
+ } else {
+ result.add(null);
+ }
+ }
+ return result;
+ }
+
+ private String[] buildKeys(List<List<Object>> keys) {
+ String[] stringKeys = new String[keys.size()];
+ int index = 0;
+ for (List<Object> key : keys)
+ stringKeys[index++] = keyFactory.build(key);
+ return stringKeys;
+ }
+
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
+ if (keys.size() == 0) {
+ return;
+ }
+
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+ String[] keyValue = buildKeyValuesList(keys, vals);
+ jedis.mset(keyValue);
+ } finally {
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
+ }
+ }
+ } else {
+ Jedis jedis = jedisPool.getResource();
+ try {
+ Pipeline pl = jedis.pipelined();
--- End diff --
@dashengju We can make buildKeyValuesMap() and use hmset instead of
Pipeline & multiple hset.
> add storm-redis to storm external
> ---------------------------------
>
> Key: STORM-609
> URL: https://issues.apache.org/jira/browse/STORM-609
> Project: Apache Storm
> Issue Type: New Feature
> Affects Versions: 0.10.0
> Reporter: DashengJu
> Assignee: DashengJu
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)