This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8e8c65c66 [STORM-3680] Upgrade Jedis Library (#3314)
8e8c65c66 is described below
commit 8e8c65c666814685970d8791e2a9ba3a46ae1129
Author: Steviep <[email protected]>
AuthorDate: Mon Dec 4 18:20:09 2023 +0900
[STORM-3680] Upgrade Jedis Library (#3314)
Upgrade Jedis Library to 5.x
---------
Co-authored-by: Richard Zowalla <[email protected]>
---
DEPENDENCY-LICENSES | 9 +-
LICENSE-binary | 2 +-
.../tools/Base64ToBinaryStateMigrationUtil.java | 3 +-
external/storm-elasticsearch/pom.xml | 2 +-
external/storm-redis/pom.xml | 13 +
.../apache/storm/redis/bolt/AbstractRedisBolt.java | 19 +-
.../apache/storm/redis/bolt/RedisFilterBolt.java | 16 +-
.../apache/storm/redis/bolt/RedisLookupBolt.java | 6 +-
.../apache/storm/redis/bolt/RedisStoreBolt.java | 6 +-
.../common/adapter/RedisCommandsAdapterJedis.java | 4 +-
.../adapter/RedisCommandsAdapterJedisCluster.java | 7 +-
.../storm/redis/common/commands/RedisCommands.java | 5 +-
.../common/container/JedisClusterContainer.java | 106 ++++-
...eContainer.java => JedisCommandsContainer.java} | 49 +-
.../container/JedisCommandsContainerBuilder.java | 19 +-
.../redis/common/container/JedisContainer.java | 127 ++++-
.../common/container/RedisClusterContainer.java | 9 +-
.../container/RedisCommandsContainerBuilder.java | 14 +-
.../storm/redis/state/RedisKeyValueState.java | 2 +-
.../redis/state/RedisKeyValueStateIterator.java | 4 +-
.../redis/trident/state/RedisClusterMapState.java | 7 +-
.../redis/trident/state/RedisClusterState.java | 9 +-
.../storm/redis/bolt/RedisFilterBoltTest.java | 509 +++++++++++++++++++++
.../state/RedisKeyValueStateIteratorTest.java | 4 +-
.../storm/redis/state/RedisKeyValueStateTest.java | 2 +-
.../apache/storm/redis/util/JedisTestHelper.java | 81 ++++
.../org/apache/storm/redis/util/StubTuple.java | 199 ++++++++
.../apache/storm/redis/util/TupleTestHelper.java | 45 ++
.../redis/util/outputcollector/EmittedTuple.java | 56 +++
.../util/outputcollector/StubOutputCollector.java | 94 ++++
integration-test/pom.xml | 1 -
pom.xml | 11 +-
.../storm/sql/redis/RedisDataSourcesProvider.java | 3 +-
33 files changed, 1311 insertions(+), 132 deletions(-)
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 57fb93e51..ffea76d71 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -15,6 +15,7 @@ List of third-party dependencies grouped by their license
type.
* Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org)
* Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5
- https://commons.apache.org/proper/commons-fileupload/)
* Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 -
https://commons.apache.org/proper/commons-lang/)
+ * Apache Commons Pool (org.apache.commons:commons-pool2:2.12.0 -
https://commons.apache.org/proper/commons-pool/)
* Apache Commons Text (org.apache.commons:commons-text:1.11.0 -
https://commons.apache.org/proper/commons-text)
* Apache Directory API ASN.1 API
(org.apache.directory.api:api-asn1-api:2.1.4 -
https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-api/)
* Apache Directory API ASN.1 BER
(org.apache.directory.api:api-asn1-ber:2.1.4 -
https://directory.apache.org/api-parent/api-asn1-parent/api-asn1-ber/)
@@ -26,8 +27,7 @@ List of third-party dependencies grouped by their license
type.
* Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j/log4j-core/)
* Apache Log4j SLF4J Binding
(org.apache.logging.log4j:log4j-slf4j-impl:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j/log4j-slf4j-impl/)
* Apache Log4j Web (org.apache.logging.log4j:log4j-web:2.21.1 -
https://logging.apache.org/log4j/2.x/log4j/log4j-web/)
- * Gson (com.google.code.gson:gson:2.8.9 -
https://github.com/google/gson/gson)
- * Gson (com.google.code.gson:gson:2.9.0 -
https://github.com/google/gson/gson)
+ * Gson (com.google.code.gson:gson:2.10.1 -
https://github.com/google/gson/gson)
* Maven Plugin Tools Java Annotations
(org.apache.maven.plugin-tools:maven-plugin-annotations:3.8.1 -
https://maven.apache.org/plugin-tools/maven-plugin-annotations)
* snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 -
https://github.com/xerial/snappy-java)
@@ -73,7 +73,6 @@ List of third-party dependencies grouped by their license
type.
* Apache Commons Logging (commons-logging:commons-logging:1.2 -
http://commons.apache.org/proper/commons-logging/)
* Apache Commons Math (org.apache.commons:commons-math3:3.6.1 -
http://commons.apache.org/proper/commons-math/)
* Apache Commons Net (commons-net:commons-net:3.9.0 -
https://commons.apache.org/proper/commons-net/)
- * Apache Commons Pool (org.apache.commons:commons-pool2:2.4.2 -
http://commons.apache.org/proper/commons-pool/)
* Apache Curator (org.apache.curator:apache-curator:2.12.0 -
http://curator.apache.org)
* Apache Derby Database Engine and Embedded JDBC Driver
(org.apache.derby:derby:10.14.1.0 - http://db.apache.org/derby/)
* Apache Geronimo JCache Spec 1.0
(org.apache.geronimo.specs:geronimo-jcache_1.0_spec:1.0-alpha-1 -
http://geronimo.apache.org/maven/specs/geronimo-jcache_1.0_spec/1.0-alpha-1)
@@ -215,7 +214,6 @@ List of third-party dependencies grouped by their license
type.
* Google Guice - Extensions - AssistedInject
(com.google.inject.extensions:guice-assistedinject:3.0 -
http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/)
* Google Guice - Extensions - Servlet
(com.google.inject.extensions:guice-servlet:4.0 -
https://github.com/google/guice/extensions-parent/guice-servlet)
* Graphite Integration for Metrics
(io.dropwizard.metrics:metrics-graphite:3.2.6 -
http://metrics.dropwizard.io/metrics-graphite/)
- * Gson (com.google.code.gson:gson:2.2.4 -
http://code.google.com/p/google-gson/)
* Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1
- http://code.google.com/p/guava-libraries/guava)
* Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 -
https://github.com/google/guava/guava)
* Guava: Google Core Libraries for Java
(com.google.guava:guava:32.1.3-jre - https://github.com/google/guava)
@@ -659,7 +657,7 @@ List of third-party dependencies grouped by their license
type.
* argparse4j (net.sourceforge.argparse4j:argparse4j:0.8.1 -
http://argparse4j.github.io)
* Checker Qual (org.checkerframework:checker-qual:3.37.0 -
https://checkerframework.org/)
* JCodings (org.jruby.jcodings:jcodings:1.0.55 -
http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
- * Jedis (redis.clients:jedis:2.9.0 -
https://github.com/xetorthio/jedis)
+ * Jedis (redis.clients:jedis:5.1.0 - https://github.com/redis/jedis)
* Joni (org.jruby.joni:joni:2.1.31 -
http://nexus.sonatype.org/oss-repository-hosting.html/joni)
* JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 -
http://pholser.github.io/jopt-simple)
* JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.36 -
http://www.slf4j.org)
@@ -682,6 +680,7 @@ List of third-party dependencies grouped by their license
type.
Public Domain
* AOP alliance (aopalliance:aopalliance:1.0 -
http://aopalliance.sourceforge.net)
+ * JSON in Java (org.json:json:20231013 -
https://github.com/douglascrockford/JSON-java)
Revised BSD
diff --git a/LICENSE-binary b/LICENSE-binary
index a6c05f4c8..3d10d5dac 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -656,7 +656,6 @@ The license texts of these dependencies can be found in the
licenses directory.
* Apache Commons FileUpload (commons-fileupload:commons-fileupload:1.5
- https://commons.apache.org/proper/commons-fileupload/)
* Apache Commons Lang (org.apache.commons:commons-lang3:3.13.0 -
https://commons.apache.org/proper/commons-lang/)
* Apache Commons Text (org.apache.commons:commons-text:1.11.0 -
https://commons.apache.org/proper/commons-text)
- * Gson (com.google.code.gson:gson:2.9.0 -
https://github.com/google/gson/gson)
* snappy-java (org.xerial.snappy:snappy-java:1.1.10.4 -
https://github.com/xerial/snappy-java)
Apache License
@@ -808,6 +807,7 @@ The license texts of these dependencies can be found in the
licenses directory.
* Google Guice - Extensions - AssistedInject
(com.google.inject.extensions:guice-assistedinject:3.0 -
http://code.google.com/p/google-guice/extensions-parent/guice-assistedinject/)
* Google Guice - Extensions - Servlet
(com.google.inject.extensions:guice-servlet:4.0 -
https://github.com/google/guice/extensions-parent/guice-servlet)
* Graphite Integration for Metrics
(io.dropwizard.metrics:metrics-graphite:3.2.6 -
http://metrics.dropwizard.io/metrics-graphite/)
+ * Gson (com.google.code.gson:gson:2.10.1 -
https://github.com/google/gson/gson)
* Guava: Google Core Libraries for Java (com.google.guava:guava:16.0.1
- http://code.google.com/p/guava-libraries/guava)
* Guava: Google Core Libraries for Java
(com.google.guava:guava:32.1.3-jre - https://github.com/google/guava)
* Guava InternalFutureFailureAccess and InternalFutures
(com.google.guava:failureaccess:1.0.1 -
https://github.com/google/guava/failureaccess)
diff --git
a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
index 9f060b806..9d0d4c113 100644
---
a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
+++
b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/tools/Base64ToBinaryStateMigrationUtil.java
@@ -35,8 +35,7 @@ import
org.apache.storm.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.RedisCommandsInstanceContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
public class Base64ToBinaryStateMigrationUtil {
private static final Logger LOG =
LoggerFactory.getLogger(Base64ToBinaryStateMigrationUtil.class);
diff --git a/external/storm-elasticsearch/pom.xml
b/external/storm-elasticsearch/pom.xml
index ba68ee7b8..df31db80b 100644
--- a/external/storm-elasticsearch/pom.xml
+++ b/external/storm-elasticsearch/pom.xml
@@ -125,7 +125,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
- <version>1.19.1</version>
+ <version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index b93d3d951..1cb7b6e06 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -78,6 +78,19 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
</dependency>
+ <!-- Test Containers -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index fd49f9180..0a7d293d1 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -15,12 +15,11 @@ package org.apache.storm.redis.bolt;
import java.util.Map;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import redis.clients.jedis.JedisCommands;
/**
* AbstractRedisBolt class is for users to implement custom bolts which makes
interaction with Redis.
@@ -45,7 +44,7 @@ import redis.clients.jedis.JedisCommands;
public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
protected OutputCollector collector;
- private transient JedisCommandsInstanceContainer container;
+ private transient JedisCommandsContainer container;
private JedisPoolConfig jedisPoolConfig;
private JedisClusterConfig jedisClusterConfig;
@@ -89,18 +88,10 @@ public abstract class AbstractRedisBolt extends
BaseTickTupleAwareRichBolt {
* Borrow JedisCommands instance from container.<p/>
* JedisCommands is an interface which contains single key operations.
* @return implementation of JedisCommands
- * @see JedisCommandsInstanceContainer#getInstance()
+ * @see JedisCommandsContainer
*/
- protected JedisCommands getInstance() {
- return this.container.getInstance();
- }
-
- /**
- * Return borrowed instance to container.
- * @param instance borrowed object
- */
- protected void returnInstance(JedisCommands instance) {
- this.container.returnInstance(instance);
+ protected JedisCommandsContainer getInstance() {
+ return this.container;
}
@Override
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
index a3f0ace8a..d9228d32d 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisFilterBolt.java
@@ -13,14 +13,16 @@
package org.apache.storm.redis.bolt;
import java.util.List;
+import java.util.Objects;
+
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisFilterMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.GeoCoordinate;
-import redis.clients.jedis.JedisCommands;
/**
* Basic bolt for querying from Redis and filters out if key/field doesn't
exist.
@@ -85,7 +87,7 @@ public class RedisFilterBolt extends AbstractRedisBolt {
String key = filterMapper.getKeyFromTuple(input);
boolean found;
- JedisCommands jedisCommand = null;
+ JedisCommandsContainer jedisCommand = null;
try {
jedisCommand = getInstance();
@@ -112,7 +114,13 @@ public class RedisFilterBolt extends AbstractRedisBolt {
case GEO:
List<GeoCoordinate> geopos =
jedisCommand.geopos(additionalKey, key);
- found = (geopos != null && geopos.size() > 0);
+ if (geopos == null || geopos.isEmpty()) {
+ found = false;
+ } else {
+ // If any entry is NOT null, then we have a match.
+ found = geopos.stream()
+ .anyMatch(Objects::nonNull);
+ }
break;
default:
@@ -127,8 +135,6 @@ public class RedisFilterBolt extends AbstractRedisBolt {
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
- } finally {
- returnInstance(jedisCommand);
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
index 4d903402a..07c79773c 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -15,12 +15,12 @@ package org.apache.storm.redis.bolt;
import java.util.List;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import redis.clients.jedis.JedisCommands;
/**
* Basic bolt for querying from Redis and emits response as tuple.
@@ -70,7 +70,7 @@ public class RedisLookupBolt extends AbstractRedisBolt {
String key = lookupMapper.getKeyFromTuple(input);
Object lookupValue;
- JedisCommands jedisCommand = null;
+ JedisCommandsContainer jedisCommand = null;
try {
jedisCommand = getInstance();
@@ -116,8 +116,6 @@ public class RedisLookupBolt extends AbstractRedisBolt {
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
- } finally {
- returnInstance(jedisCommand);
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
index ce08a735d..065d9cbea 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -14,11 +14,11 @@ package org.apache.storm.redis.bolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainer;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
-import redis.clients.jedis.JedisCommands;
/**
* Basic bolt for writing to Redis.
@@ -66,7 +66,7 @@ public class RedisStoreBolt extends AbstractRedisBolt {
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);
- JedisCommands jedisCommand = null;
+ JedisCommandsContainer jedisCommand = null;
try {
jedisCommand = getInstance();
@@ -114,8 +114,6 @@ public class RedisStoreBolt extends AbstractRedisBolt {
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(input);
- } finally {
- returnInstance(jedisCommand);
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
index d791bd865..a3f1f5a6b 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedis.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.util.Map;
import org.apache.storm.redis.common.commands.RedisCommands;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
/**
* Adapter class to make Jedis instance play with BinaryRedisCommands
interface.
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
index 15f5e0a88..9d2d96b19 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/adapter/RedisCommandsAdapterJedisCluster.java
@@ -23,14 +23,15 @@ import java.io.IOException;
import java.util.Map;
import org.apache.storm.redis.common.commands.RedisCommands;
import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
+
/**
* Adapter class to make JedisCluster instance play with BinaryRedisCommands
interface.
*/
public class RedisCommandsAdapterJedisCluster implements RedisCommands,
Closeable {
- private JedisCluster jedisCluster;
+ private final JedisCluster jedisCluster;
public RedisCommandsAdapterJedisCluster(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
index 58835b1ab..5a7170eb2 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/commands/RedisCommands.java
@@ -19,8 +19,9 @@
package org.apache.storm.redis.common.commands;
import java.util.Map;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
/**
* This interface represents Jedis methods exhaustively which are used on
storm-redis.
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
index ca2a2f44e..5220c1c7a 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -12,16 +12,16 @@
package org.apache.storm.redis.common.container;
-import java.io.IOException;
+import java.util.List;
+import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
/**
* Container for managing JedisCluster.
* <p/>
* Note that JedisCluster doesn't need to be pooled since it's thread-safe and
it stores pools internally.
*/
-public class JedisClusterContainer implements JedisCommandsInstanceContainer {
+public class JedisClusterContainer implements JedisCommandsContainer {
private JedisCluster jedisCluster;
@@ -33,20 +33,94 @@ public class JedisClusterContainer implements
JedisCommandsInstanceContainer {
this.jedisCluster = jedisCluster;
}
- /**
- * {@inheritDoc}
- */
@Override
- public JedisCommands getInstance() {
- return this.jedisCluster;
+ public Boolean exists(final String key) {
+ return jedisCluster.exists(key);
+ }
+
+ @Override
+ public String get(final String key) {
+ return jedisCluster.get(key);
+ }
+
+ @Override
+ public String hget(final String key, final String field) {
+ return jedisCluster.hget(key, field);
+ }
+
+ @Override
+ public Long geoadd(final String key, final double longitude, final double
latitude, final String member) {
+ return jedisCluster.geoadd(key, longitude, latitude, member);
+ }
+
+ @Override
+ public List<GeoCoordinate> geopos(final String key, final String...
members) {
+ return jedisCluster.geopos(key, members);
+ }
+
+ @Override
+ public Boolean hexists(final String key, final String field) {
+ return jedisCluster.hexists(key, field);
+ }
+
+ @Override
+ public Long hset(final String key, final String field, final String value)
{
+ return jedisCluster.hset(key, field, value);
+ }
+
+ @Override
+ public String lpop(final String key) {
+ return jedisCluster.lpop(key);
+ }
+
+ @Override
+ public Long pfadd(final String key, final String... elements) {
+ return jedisCluster.pfadd(key, elements);
+ }
+
+ @Override
+ public long pfcount(final String key) {
+ return jedisCluster.pfcount(key);
+ }
+
+ @Override
+ public Long rpush(final String key, final String... string) {
+ return jedisCluster.rpush(key, string);
+ }
+
+ @Override
+ public Long sadd(final String key, final String... member) {
+ return jedisCluster.sadd(key, member);
+ }
+
+ @Override
+ public Long scard(final String key) {
+ return jedisCluster.scard(key);
+ }
+
+ @Override
+ public String set(final String key, final String value) {
+ return jedisCluster.set(key, value);
+ }
+
+ @Override
+ public Boolean sismember(final String key, final String member) {
+ return jedisCluster.sismember(key, member);
+ }
+
+ @Override
+ public Long zadd(final String key, final double score, final String
member) {
+ return jedisCluster.zadd(key, score, member);
+ }
+
+ @Override
+ public Long zrank(final String key, final String member) {
+ return jedisCluster.zrank(key, member);
}
- /**
- * {@inheritDoc}
- */
@Override
- public void returnInstance(JedisCommands jedisCommands) {
- // do nothing
+ public Double zscore(final String key, final String member) {
+ return jedisCluster.zscore(key, member);
}
/**
@@ -54,10 +128,6 @@ public class JedisClusterContainer implements
JedisCommandsInstanceContainer {
*/
@Override
public void close() {
- try {
- this.jedisCluster.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ this.jedisCluster.close();
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
similarity index 51%
rename from
external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
rename to
external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
index 49a3373fe..38b1321b7 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainer.java
@@ -13,23 +13,48 @@
package org.apache.storm.redis.common.container;
import java.io.Closeable;
-import redis.clients.jedis.JedisCommands;
+import java.util.List;
+import redis.clients.jedis.GeoCoordinate;
/**
* Interfaces for containers which stores instances implementing JedisCommands.
*/
-public interface JedisCommandsInstanceContainer extends Closeable {
- /**
- * Borrows instance from container.
- * @return instance which implements JedisCommands
- */
- JedisCommands getInstance();
+public interface JedisCommandsContainer extends Closeable {
+ Boolean exists(String key);
- /**
- * Returns instance to container.
- * @param jedisCommands borrowed instance
- */
- void returnInstance(JedisCommands jedisCommands);
+ String get(String key);
+
+ String hget(String key, String field);
+
+ Long geoadd(String key, double longitude, double latitude, String member);
+
+ List<GeoCoordinate> geopos(String key, String... members);
+
+ Boolean hexists(String key, String field);
+
+ Long hset(String key, String field, String value);
+
+ String lpop(String key);
+
+ Long pfadd(String key, String... elements);
+
+ long pfcount(String key);
+
+ Long rpush(String key, String... string);
+
+ Long sadd(String key, String... member);
+
+ Long scard(String key);
+
+ String set(String key, String value);
+
+ Boolean sismember(String key, String member);
+
+ Long zadd(String key, double score, String member);
+
+ Long zrank(String key, String member);
+
+ Double zscore(String key, String member);
/**
* Release Container.
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
index 86f0f0912..9370e3421 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -12,8 +12,10 @@
package org.apache.storm.redis.common.container;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
@@ -22,18 +24,17 @@ import redis.clients.jedis.JedisPool;
*/
public class JedisCommandsContainerBuilder {
- // FIXME: We're using default config since it cannot be serialized
- // We still needs to provide some options externally
- public static final redis.clients.jedis.JedisPoolConfig
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
/**
* Builds container for single Redis environment.
* @param config configuration for JedisPool
* @return container for single Redis environment
*/
- public static JedisCommandsInstanceContainer build(JedisPoolConfig config)
{
+ public static JedisCommandsContainer build(JedisPoolConfig config) {
+ // FIXME: We're using default config since it cannot be serialized
+ // We still needs to provide some options externally
JedisPool jedisPool =
- new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(),
config.getPort(), config.getTimeout(), config.getPassword(),
+ new JedisPool(new redis.clients.jedis.JedisPoolConfig(),
config.getHost(), config.getPort(),
+ config.getTimeout(), config.getPassword(),
config.getDatabase());
return new JedisContainer(jedisPool);
}
@@ -43,10 +44,12 @@ public class JedisCommandsContainerBuilder {
* @param config configuration for JedisCluster
* @return container for Redis Cluster environment
*/
- public static JedisCommandsInstanceContainer build(JedisClusterConfig
config) {
+ public static JedisCommandsContainer build(JedisClusterConfig config) {
+ // FIXME: We're using default config since it cannot be serialized
+ // We still needs to provide some options externally
JedisCluster jedisCluster =
new JedisCluster(config.getNodes(), config.getTimeout(),
config.getTimeout(), config.getMaxRedirections(), config.getPassword(),
- DEFAULT_POOL_CONFIG);
+ new GenericObjectPoolConfig<>());
return new JedisClusterContainer(jedisCluster);
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
index 1ebb5508a..1eed610dc 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -14,15 +14,19 @@ package org.apache.storm.redis.common.container;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.commands.JedisCommands;
/**
- * Container for managing Jedis instances.
+ * Adapter for providing a unified interface for running commands over both
Jedis and JedisCluster instances.
*/
-public class JedisContainer implements JedisCommandsInstanceContainer {
+public class JedisContainer implements JedisCommandsContainer {
private static final Logger LOG =
LoggerFactory.getLogger(JedisContainer.class);
private JedisPool jedisPool;
@@ -35,35 +39,114 @@ public class JedisContainer implements
JedisCommandsInstanceContainer {
this.jedisPool = jedisPool;
}
+ private <T> T runCommand(Function<JedisCommands, T> command) {
+ final JedisCommands jedisCommands = jedisPool.getResource();
+ try {
+ return command.apply(jedisCommands);
+ } finally {
+ try {
+ ((Closeable) jedisCommands).close();
+ } catch (IOException e) {
+ LOG.error("Failed to close (return) instance to pool");
+ }
+ }
+ }
+
/**
* {@inheritDoc}
*/
@Override
- public JedisCommands getInstance() {
- return jedisPool.getResource();
+ public void close() {
+ jedisPool.close();
}
- /**
- * {@inheritDoc}
- */
@Override
- public void returnInstance(JedisCommands jedisCommands) {
- if (jedisCommands == null) {
- return;
- }
+ public Boolean exists(final String key) {
+ return runCommand((jedisCommands) -> jedisCommands.exists(key));
+ }
- try {
- ((Closeable) jedisCommands).close();
- } catch (IOException e) {
- LOG.error("Failed to close (return) instance to pool");
- }
+ @Override
+ public String get(final String key) {
+ return runCommand((jedisCommands) -> jedisCommands.get(key));
}
- /**
- * {@inheritDoc}
- */
@Override
- public void close() {
- jedisPool.close();
+ public String hget(final String key, final String field) {
+ return runCommand((jedisCommands) -> jedisCommands.hget(key, field));
+ }
+
+ @Override
+ public Long geoadd(final String key, final double longitude, final double
latitude, final String member) {
+ return runCommand((jedisCommands) -> jedisCommands.geoadd(key,
longitude, latitude, member));
+ }
+
+ @Override
+ public List<GeoCoordinate> geopos(final String key, final String...
members) {
+ return runCommand((jedisCommands) -> jedisCommands.geopos(key,
members));
+ }
+
+ @Override
+ public Boolean hexists(final String key, final String field) {
+ return runCommand((jedisCommands) -> jedisCommands.hexists(key,
field));
+ }
+
+ @Override
+ public Long hset(final String key, final String field, final String value)
{
+ return runCommand((jedisCommands) -> jedisCommands.hset(key, field,
value));
+ }
+
+ @Override
+ public String lpop(final String key) {
+ return runCommand((jedisCommands) -> jedisCommands.lpop(key));
+ }
+
+ @Override
+ public Long pfadd(final String key, final String... elements) {
+ return runCommand((jedisCommands) -> jedisCommands.pfadd(key,
elements));
+ }
+
+ @Override
+ public long pfcount(final String key) {
+ return runCommand((jedisCommands) -> jedisCommands.pfcount(key));
+ }
+
+ @Override
+ public Long rpush(final String key, final String... string) {
+ return runCommand((jedisCommands) -> jedisCommands.rpush(key, string));
+ }
+
+ @Override
+ public Long sadd(final String key, final String... member) {
+ return runCommand((jedisCommands) -> jedisCommands.sadd(key, member));
+ }
+
+ @Override
+ public Long scard(final String key) {
+ return runCommand((jedisCommands) -> jedisCommands.scard(key));
+ }
+
+ @Override
+ public String set(final String key, final String value) {
+ return runCommand((jedisCommands) -> jedisCommands.set(key, value));
+ }
+
+ @Override
+ public Boolean sismember(final String key, final String member) {
+ return runCommand((jedisCommands) -> jedisCommands.sismember(key,
member));
+ }
+
+ @Override
+ public Long zadd(final String key, final double score, final String
member) {
+ return runCommand((jedisCommands) -> jedisCommands.zadd(key, score,
member));
+ }
+
+ @Override
+ public Long zrank(final String key, final String member) {
+ return runCommand((jedisCommands) -> jedisCommands.zrank(key, member));
+ }
+
+ @Override
+ public Double zscore(final String key, final String member) {
+ return runCommand((jedisCommands) -> jedisCommands.zscore(key,
member));
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
index 04c05b876..600d4fec6 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisClusterContainer.java
@@ -18,7 +18,6 @@
package org.apache.storm.redis.common.container;
-import java.io.IOException;
import org.apache.storm.redis.common.adapter.RedisCommandsAdapterJedisCluster;
import org.apache.storm.redis.common.commands.RedisCommands;
import redis.clients.jedis.JedisCluster;
@@ -60,11 +59,7 @@ public class RedisClusterContainer implements
RedisCommandsInstanceContainer {
* {@inheritDoc}
*/
@Override
- public void close() throws IOException {
- try {
- this.jedisCluster.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ public void close() {
+ this.jedisCluster.close();
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
index 81201efd9..b369e1ca0 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/RedisCommandsContainerBuilder.java
@@ -18,6 +18,7 @@
package org.apache.storm.redis.common.container;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import redis.clients.jedis.JedisCluster;
@@ -28,10 +29,6 @@ import redis.clients.jedis.JedisPool;
*/
public class RedisCommandsContainerBuilder {
- // FIXME: We're using default config since it cannot be serialized
- // We still needs to provide some options externally
- public static final redis.clients.jedis.JedisPoolConfig
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
/**
* Builds container for single Redis environment.
*
@@ -39,8 +36,11 @@ public class RedisCommandsContainerBuilder {
* @return container for single Redis environment
*/
public static RedisCommandsInstanceContainer build(JedisPoolConfig config)
{
+ // FIXME: We're using default config since it cannot be serialized
+ // We still needs to provide some options externally
JedisPool jedisPool =
- new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(),
config.getPort(), config.getTimeout(), config.getPassword(),
+ new JedisPool(new redis.clients.jedis.JedisPoolConfig(),
config.getHost(), config.getPort(),
+ config.getTimeout(), config.getPassword(),
config.getDatabase());
return new RedisContainer(jedisPool);
}
@@ -52,8 +52,10 @@ public class RedisCommandsContainerBuilder {
* @return container for Redis Cluster environment
*/
public static RedisCommandsInstanceContainer build(JedisClusterConfig
config) {
+ // FIXME: We're using default config since it cannot be serialized
+ // We still needs to provide some options externally
JedisCluster jedisCluster =
- new JedisCluster(config.getNodes(), config.getTimeout(),
config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+ new JedisCluster(config.getNodes(), config.getTimeout(),
config.getMaxRedirections(), new GenericObjectPoolConfig<>());
return new RedisClusterContainer(jedisCluster);
}
}
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 2a11d962a..880aa5373 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -35,7 +35,7 @@ import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
/**
* A redis based implementation that persists the state in Redis.
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
index 19d2b9194..e6b282714 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
@@ -22,8 +22,8 @@ import org.apache.storm.state.BaseBinaryStateIterator;
import org.apache.storm.state.DefaultStateEncoder;
import org.apache.storm.state.Serializer;
import org.apache.storm.state.StateEncoder;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
/**
* An iterator over {@link RedisKeyValueState}.
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index dddee1518..ae634d686 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -15,6 +15,8 @@ package org.apache.storm.redis.trident.state;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.task.IMetricsContext;
@@ -31,6 +33,7 @@ import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.SnapshottableMap;
import org.apache.storm.trident.state.map.TransactionalMap;
import org.apache.storm.tuple.Values;
+import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisCluster;
/**
@@ -277,7 +280,7 @@ public class RedisClusterMapState<T> extends
AbstractRedisMapState<T> {
* RedisClusterMapState.Factory provides Redis Cluster environment version
of StateFactory.
*/
protected static class Factory implements StateFactory {
- public static final redis.clients.jedis.JedisPoolConfig
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+ public static final GenericObjectPoolConfig<Connection>
DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>();
JedisClusterConfig jedisClusterConfig;
@@ -316,7 +319,7 @@ public class RedisClusterMapState<T> extends
AbstractRedisMapState<T> {
*/
@Override
public State makeState(Map<String, Object> conf, IMetricsContext
metrics, int partitionIndex, int numPartitions) {
- JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
+ final JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
jedisClusterConfig.getTimeout(),
jedisClusterConfig.getTimeout(),
jedisClusterConfig.getMaxRedirections(),
diff --git
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index c7f018995..e052dd1b6 100644
---
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -13,10 +13,13 @@
package org.apache.storm.redis.trident.state;
import java.util.Map;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
+import redis.clients.jedis.Connection;
import redis.clients.jedis.JedisCluster;
/**
@@ -74,9 +77,9 @@ public class RedisClusterState implements State {
* @see StateFactory
*/
public static class Factory implements StateFactory {
- public static final redis.clients.jedis.JedisPoolConfig
DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+ public static final GenericObjectPoolConfig<Connection>
DEFAULT_POOL_CONFIG = new GenericObjectPoolConfig<>();
- private JedisClusterConfig jedisClusterConfig;
+ private final JedisClusterConfig jedisClusterConfig;
/**
* Constructor.
@@ -92,7 +95,7 @@ public class RedisClusterState implements State {
*/
@Override
public State makeState(Map<String, Object> conf, IMetricsContext
metrics, int partitionIndex, int numPartitions) {
- JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
+ final JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
jedisClusterConfig.getTimeout(),
jedisClusterConfig.getTimeout(),
jedisClusterConfig.getMaxRedirections(),
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
new file mode 100644
index 000000000..dc80361a6
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/bolt/RedisFilterBoltTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.redis.bolt;
+
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisFilterMapper;
+import org.apache.storm.redis.util.JedisTestHelper;
+import org.apache.storm.redis.util.StubTuple;
+import org.apache.storm.redis.util.TupleTestHelper;
+import org.apache.storm.redis.util.outputcollector.EmittedTuple;
+import org.apache.storm.redis.util.outputcollector.StubOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.GEO;
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HASH;
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.HYPER_LOG_LOG;
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SET;
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.SORTED_SET;
+import static
org.apache.storm.redis.common.mapper.RedisDataTypeDescription.RedisDataType.STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+@Testcontainers
+class RedisFilterBoltTest {
+
+ @Container
+ public GenericContainer container = new
GenericContainer("redis:7.2.3-alpine")
+ .withExposedPorts(6379);
+
+ private JedisTestHelper jedisHelper;
+
+ private JedisPoolConfig.Builder configBuilder;
+ private StubOutputCollector outputCollector;
+ private TopologyContext topologyContext;
+
+ @BeforeEach
+ void setup() {
+ configBuilder = new JedisPoolConfig.Builder();
+ configBuilder
+ .setHost(container.getHost())
+ .setPort(container.getFirstMappedPort())
+ .setTimeout(10);
+
+ outputCollector = new StubOutputCollector();
+ topologyContext = mock(TopologyContext.class);
+ jedisHelper = new JedisTestHelper(container);
+ }
+
+ @AfterEach
+ void cleanup() {
+ verifyNoMoreInteractions(topologyContext);
+ jedisHelper.close();
+ }
+
+ /**
+ * Smoke test the exists check when the key is NOT found.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_exists_keyNotFound() {
+ // Define input key
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does not exist in redis
+ jedisHelper.delete(inputKey);
+ assertFalse(jedisHelper.exists(inputKey), "Sanity check key should not
exist");
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(STRING);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the exists check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_exists_keyFound() {
+ // Define input key
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.set(inputKey, "some-value");
+ assertTrue(jedisHelper.exists(inputKey), "Sanity check key exists.");
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(STRING);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Smoke test the sismember check when the key is NOT found in the set.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_sismember_notMember() {
+ // Define input key
+ final String setKey = "ThisIsMySet";
+ final String inputKey = "ThisIsMyKey";
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(SET, setKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the exists check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_sismember_isMember() {
+ // Define input key
+ final String setKey = "ThisIsMySet";
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.smember(setKey, inputKey);
+ assertTrue(jedisHelper.sismember(setKey, inputKey), "Sanity check,
should be a member");
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(SET, setKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Smoke test the hexists check when the key is NOT found in the set.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_hexists_notMember() {
+ // Define input key
+ final String hashKey = "ThisIsMyHash";
+ final String inputKey = "ThisIsMyKey";
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(HASH, hashKey);
+
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the hexists check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_hexists_isMember() {
+ // Define input key
+ final String hashKey = "ThisIsMyHash";
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.hset(hashKey, inputKey, "value");
+ assertTrue(jedisHelper.hexists(hashKey, inputKey), "Sanity check,
should be a member");
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(HASH, hashKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Smoke test the zrank check when the key is NOT found in the set.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_zrank_notMember() {
+ // Define input key
+ final String setKey = "ThisIsMySetKey";
+ final String inputKey = "ThisIsMyKey";
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(SORTED_SET, setKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the zrank check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_zrank_isMember() {
+ // Define input key
+ final String setKey = "ThisIsMySetKey";
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.zrank(setKey, 2, inputKey);
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(SORTED_SET, setKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Smoke test the pfcount check when the key is NOT found in the set.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_pfcount_notMember() {
+ // Define input key
+ final String inputKey = "ThisIsMyKey";
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(HYPER_LOG_LOG);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the pfcount check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_pfcount_isMember() {
+ // Define input key
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.pfadd(inputKey, "my value");
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(HYPER_LOG_LOG);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Smoke test the geopos check when the key is NOT found in the set.
+ * Expectation is tuple is acked, and nothing is emitted.
+ */
+ @Test
+ void smokeTest_geopos_notMember() {
+ // Define input key
+ final String geoKey = "ThisIsMyGeoKey";
+ final String inputKey = "ThisIsMyKey";
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(GEO, geoKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify the bolt filtered the input tuple.
+ verifyTupleFiltered();
+ }
+
+ /**
+ * Smoke test the geopos check when the key IS found.
+ * Expectation is tuple is acked, and tuple is emitted.
+ */
+ @Test
+ void smokeTest_geopos_isMember() {
+ // Define input key
+ final String geoKey = "ThisIsMyGeoKey";
+ final String inputKey = "ThisIsMyKey";
+
+ // Ensure key does exist in redis
+ jedisHelper.geoadd(geoKey, 139.731992, 35.709026, inputKey);
+
+ // Create an input tuple
+ final Map<String, Object> values = new HashMap<>();
+ values.put("key", inputKey);
+ values.put("value", "ThisIsMyValue");
+ final Tuple tuple = new StubTuple(values);
+
+ final JedisPoolConfig config = configBuilder.build();
+ final TestMapper mapper = new TestMapper(GEO, geoKey);
+
+ final RedisFilterBolt bolt = new RedisFilterBolt(config, mapper);
+ bolt.prepare(new HashMap<>(), topologyContext, new
OutputCollector(outputCollector));
+ bolt.process(tuple);
+
+ // Verify Tuple passed through the bolt
+ verifyTuplePassed(tuple);
+ }
+
+ /**
+ * Utility method to help verify that a tuple passed throught hte
RedisFilterBolt properly.
+ * @param expectedTuple The tuple we expected to pass through the bolt.
+ */
+ private void verifyTuplePassed(final Tuple expectedTuple) {
+ // Verify no errors or failed tuples
+ assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have
no reported errors");
+ assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have
no failed tuples");
+
+ // We should have a single acked tuple
+ assertEquals(1, outputCollector.getAckedTuples().size(), "Should have
a single acked tuple");
+
+ // We should have a single emitted tuple.
+ assertEquals(1, outputCollector.getEmittedTuples().size(), "Should
have a single emitted tuple");
+
+ // Verify the tuple is what we expected
+ final EmittedTuple emittedTuple =
outputCollector.getEmittedTuples().get(0);
+ assertEquals("default", emittedTuple.getStreamId());
+ TupleTestHelper.verifyAnchors(emittedTuple, expectedTuple);
+ TupleTestHelper.verifyEmittedTuple(emittedTuple,
expectedTuple.getValues());
+ }
+
+ /**
+ * Utility method to help verify that no tuples were passed through the
RedisFilterBolt.
+ */
+ private void verifyTupleFiltered() {
+ // Verify no errors or failed tuples
+ assertTrue(outputCollector.getReportedErrors().isEmpty(), "Should have
no reported errors");
+ assertTrue(outputCollector.getFailedTuples().isEmpty(), "Should have
no failed tuples");
+
+ // We should have a single acked tuple
+ assertEquals(1, outputCollector.getAckedTuples().size(), "Should have
a single acked tuple");
+
+ // We should have no emitted tuple.
+ assertTrue(outputCollector.getEmittedTuples().isEmpty(), "Should have
no emitted tuples");
+ }
+
+ /**
+ * Test Implementation.
+ */
+ private static class TestMapper implements RedisFilterMapper {
+ private final RedisDataTypeDescription.RedisDataType dataType;
+ private final String additionalKey;
+
+ private TestMapper(final RedisDataTypeDescription.RedisDataType
dataType) {
+ this(dataType, null);
+ }
+
+ private TestMapper(final RedisDataTypeDescription.RedisDataType
dataType, final String additionalKey) {
+ this.dataType = dataType;
+ this.additionalKey = additionalKey;
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("key",
"value"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return new RedisDataTypeDescription(dataType, additionalKey);
+ }
+
+ @Override
+ public String getKeyFromTuple(final ITuple tuple) {
+ return tuple.getStringByField("key");
+ }
+
+ @Override
+ public String getValueFromTuple(final ITuple tuple) {
+ return tuple.getStringByField("value");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
index 212504e65..00f4a0eb4 100644
---
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
@@ -24,8 +24,8 @@ import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.Serializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index e6ee68887..f82ef6fa4 100644
---
a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
-import redis.clients.util.SafeEncoder;
+import redis.clients.jedis.util.SafeEncoder;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
new file mode 100644
index 000000000..9c357493f
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/JedisTestHelper.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.Objects;
+import org.testcontainers.containers.GenericContainer;
+import redis.clients.jedis.Jedis;
+
+/**
+ * Utility class for helping interact with a Redis service in tests.
+ */
+public class JedisTestHelper {
+ private final Jedis jedis;
+
+ /**
+ * Constructor.
+ * @param container Container instance to create a redis client against.
+ */
+ public JedisTestHelper(final GenericContainer container) {
+ Objects.requireNonNull(container);
+
+ jedis = new Jedis(
+ container.getHost(),
+ container.getFirstMappedPort()
+ );
+ }
+
+ public void delete(final String key) {
+ jedis.del(key);
+ }
+
+ public void geoadd(final String key, final double longitude, final double
latitude, final String value) {
+ jedis.geoadd(key, longitude, latitude, value);
+ }
+
+ public boolean hexists(final String hash, final String key) {
+ return jedis.hexists(hash, key);
+ }
+
+ public void hset(final String hash, final String key, final String value) {
+ jedis.hset(hash, key, value);
+ }
+
+ public boolean exists(final String key) {
+ return jedis.exists(key);
+ }
+
+ public void pfadd(final String key, final String value) {
+ jedis.pfadd(key, value);
+ }
+
+ public void set(final String key, final String value) {
+ jedis.set(key, value);
+ }
+
+ public void smember(final String set, final String value) {
+ jedis.sadd(set, value);
+ }
+
+ public boolean sismember(final String set, final String value) {
+ return jedis.sismember(set, value);
+ }
+
+ public void zrank(final String set, final double score, final String
value) {
+ jedis.zadd(set, score, value);
+ }
+
+ public void close() {
+ jedis.close();
+ }
+}
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java
new file mode 100644
index 000000000..ec1e3e942
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/StubTuple.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.org.apache.commons.lang.NotImplementedException;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+
+
+/**
+ * Partial Implementation of the Tuple interface for tests.
+ */
+public class StubTuple implements Tuple {
+
+ final Map<String, Object> values;
+
+ public StubTuple(final Map<String, Object> values) {
+ this.values = Collections.unmodifiableMap(new
HashMap<>(Objects.requireNonNull(values)));
+ }
+
+ @Override
+ public int size() {
+ return values.size();
+ }
+
+ @Override
+ public boolean contains(final String field) {
+ return values.containsKey(field);
+ }
+
+ @Override
+ public Fields getFields() {
+ return new Fields(values.keySet().toArray(new String[0]));
+ }
+
+ @Override
+ public int fieldIndex(final String field) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public List<Object> select(final Fields selector) {
+ return null;
+ }
+
+ @Override
+ public Object getValue(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public String getString(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Integer getInteger(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Long getLong(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Boolean getBoolean(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Short getShort(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Byte getByte(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Double getDouble(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Float getFloat(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public byte[] getBinary(final int i) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public Object getValueByField(final String field) {
+ return values.get(field);
+ }
+
+ @Override
+ public String getStringByField(final String field) {
+ return values.get(field).toString();
+ }
+
+ @Override
+ public Integer getIntegerByField(final String field) {
+ return (Integer) values.get(field);
+ }
+
+ @Override
+ public Long getLongByField(final String field) {
+ return (Long) values.get(field);
+ }
+
+ @Override
+ public Boolean getBooleanByField(final String field) {
+ return (Boolean) values.get(field);
+ }
+
+ @Override
+ public Short getShortByField(final String field) {
+ return (Short) values.get(field);
+ }
+
+ @Override
+ public Byte getByteByField(final String field) {
+ return (Byte) values.get(field);
+ }
+
+ @Override
+ public Double getDoubleByField(final String field) {
+ return (Double) values.get(field);
+ }
+
+ @Override
+ public Float getFloatByField(final String field) {
+ return (Float) values.get(field);
+ }
+
+ @Override
+ public byte[] getBinaryByField(final String field) {
+ return (byte[]) values.get(field);
+ }
+
+ @Override
+ public List<Object> getValues() {
+ return new ArrayList<>(values.values());
+ }
+
+ @Override
+ public GlobalStreamId getSourceGlobalStreamId() {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public String getSourceComponent() {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public int getSourceTask() {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public String getSourceStreamId() {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ @Override
+ public GeneralTopologyContext getContext() {
+ throw new NotImplementedException("Not implemented");
+ }
+}
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
new file mode 100644
index 000000000..0580ec18c
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/TupleTestHelper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.storm.redis.util.outputcollector.EmittedTuple;
+import org.apache.storm.tuple.Tuple;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Utility for common test validations.
+ */
+public class TupleTestHelper {
+
+ public static void verifyAnchors(final EmittedTuple emittedTuple, final
Tuple expectedAnchor) {
+ Objects.requireNonNull(emittedTuple);
+ Objects.requireNonNull(expectedAnchor);
+ assertEquals(1, emittedTuple.getAnchors().size(), "Should have a
single anchor");
+
+ final Tuple anchor = emittedTuple.getAnchors().get(0);
+ assertNotNull(anchor, "Should be non-null");
+ assertEquals(expectedAnchor, anchor);
+ }
+
+ public static void verifyEmittedTuple(final EmittedTuple emittedTuple,
final List<Object> expectedValues) {
+ Objects.requireNonNull(emittedTuple);
+ Objects.requireNonNull(expectedValues);
+
+ assertEquals(expectedValues.size(), emittedTuple.getTuple().size());
+ assertEquals(expectedValues, emittedTuple.getTuple());
+ }
+}
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
new file mode 100644
index 000000000..0196db92b
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/EmittedTuple.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util.outputcollector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used with StubOutputCollector for testing.
+ */
+public class EmittedTuple {
+ private final String streamId;
+ private final List<Object> tuple;
+ private final List<Tuple> anchors;
+
+ public EmittedTuple(final String streamId, final List<Object> tuple, final
Collection<Tuple> anchors) {
+ this.streamId = streamId;
+ this.tuple = tuple;
+ this.anchors = new ArrayList<>(anchors);
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public List<Object> getTuple() {
+ return tuple;
+ }
+
+ public List<Tuple> getAnchors() {
+ return Collections.unmodifiableList(anchors);
+ }
+
+ @Override
+ public String toString() {
+ return "EmittedTuple{"
+ + "streamId='" + streamId + '\''
+ + ", tuple=" + tuple
+ + ", anchors=" + anchors
+ + '}';
+ }
+}
diff --git
a/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
new file mode 100644
index 000000000..3fec79de1
--- /dev/null
+++
b/external/storm-redis/src/test/java/org/apache/storm/redis/util/outputcollector/StubOutputCollector.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.redis.util.outputcollector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Stub implementation for testing.
+ */
+public class StubOutputCollector implements IOutputCollector {
+
+ final List<EmittedTuple> emittedTuples = new ArrayList<>();
+ final List<Tuple> ackedTuples = new ArrayList<>();
+ final List<Tuple> failedTuples = new ArrayList<>();
+ final List<Throwable> reportedErrors = new ArrayList<>();
+
+ @Override
+ public List<Integer> emit(final String streamId, final Collection<Tuple>
anchors, final List<Object> tuple) {
+ emittedTuples.add(
+ new EmittedTuple(streamId, tuple, anchors)
+ );
+
+ // Dummy value.
+ return Collections.singletonList(1);
+ }
+
+ @Override
+ public void emitDirect(final int taskId, final String streamId, final
Collection<Tuple> anchors, final List<Object> tuple) {
+ throw new RuntimeException("Not implemented yet!");
+ }
+
+ @Override
+ public void ack(final Tuple input) {
+ ackedTuples.add(input);
+ }
+
+ @Override
+ public void fail(final Tuple input) {
+ failedTuples.add(input);
+ }
+
+ @Override
+ public void resetTimeout(final Tuple input) {
+ throw new RuntimeException("Not implemented yet!");
+ }
+
+ @Override
+ public void flush() {
+ throw new RuntimeException("Not implemented yet!");
+ }
+
+ @Override
+ public void reportError(final Throwable error) {
+ reportedErrors.add(error);
+ }
+
+ public List<EmittedTuple> getEmittedTuples() {
+ return Collections.unmodifiableList(emittedTuples);
+ }
+
+ public List<Throwable> getReportedErrors() {
+ return Collections.unmodifiableList(reportedErrors);
+ }
+
+ public List<Tuple> getFailedTuples() {
+ return Collections.unmodifiableList(failedTuples);
+ }
+
+ public List<Tuple> getAckedTuples() {
+ return Collections.unmodifiableList(ackedTuples);
+ }
+
+ public void reset() {
+ emittedTuples.clear();
+ ackedTuples.clear();
+ reportedErrors.clear();
+ failedTuples.clear();
+ }
+}
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index f5f708b36..7bc9528cc 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -84,7 +84,6 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>2.8.9</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
diff --git a/pom.xml b/pom.xml
index c1716ee15..5fba628ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,13 +128,13 @@
<hamcrest.version>2.2</hamcrest.version>
<elasticsearch.version>7.17.13</elasticsearch.version>
<calcite.version>1.16.0</calcite.version>
- <jedis.version>2.9.0</jedis.version>
+ <jedis.version>5.1.0</jedis.version>
<activemq.version>5.18.3</activemq.version>
<jackson.version>2.15.2</jackson.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
-
<storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
+ <testcontainers.version>1.19.1</testcontainers.version>
<!-- Java and clojure build lifecycle test properties are defined here
to avoid having to create a default profile -->
<java.unit.test.exclude.groups>PerformanceTest</java.unit.test.exclude.groups>
@@ -157,6 +157,7 @@
<rocksdb-jni-version>8.5.4</rocksdb-jni-version>
<json-smart.version>2.5.0</json-smart.version>
<byte-buddy.version>1.14.9</byte-buddy.version>
+ <gson.version>2.10.1</gson.version>
<!-- see intellij profile below... This fixes an annoyance with
intellij -->
<provided.scope>provided</provided.scope>
@@ -1150,6 +1151,12 @@
<artifactId>byte-buddy-agent</artifactId>
<version>${byte-buddy.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
index 421f84fca..b14b7f31c 100644
---
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -42,8 +42,7 @@ import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
-
-import redis.clients.util.JedisURIHelper;
+import redis.clients.jedis.util.JedisURIHelper;
/**
* Create a Redis sink based on the URI and properties. The URI has the format
of