This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ecc1220 [BEAM-7075] Create Redis embedded server on @BeforeClass and
simplify tests
new 0e9ab8d Merge pull request #8304: [BEAM-7075] Create Redis embedded
server on @BeforeClass and simplify tests
ecc1220 is described below
commit ecc122089ac18b2cb33d9aae1d31a4789fe4eec2
Author: Ismaël Mejía <[email protected]>
AuthorDate: Mon Apr 15 00:43:13 2019 +0200
[BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests
---
.../org/apache/beam/sdk/io/redis/RedisIOTest.java | 278 +++++++--------------
1 file changed, 90 insertions(+), 188 deletions(-)
diff --git
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 585c238..320e515 100644
---
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.sdk.io.redis;
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
@@ -31,11 +32,8 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
@@ -44,246 +42,150 @@ import redis.embedded.RedisServer;
/** Test on the Redis IO. */
public class RedisIOTest {
- private static final String REDIS_HOST = "::1";
+ private static final String REDIS_HOST = "localhost";
- @Rule public TestPipeline writePipeline = TestPipeline.create();
- @Rule public TestPipeline readPipeline = TestPipeline.create();
+ @Rule public TestPipeline p = TestPipeline.create();
- private EmbeddedRedis embeddedRedis;
+ private static RedisServer server;
+ private static int port;
- @Before
- public void before() throws Exception {
- embeddedRedis = new EmbeddedRedis();
- }
+ private static Jedis client;
- @After
- public void after() throws Exception {
- embeddedRedis.close();
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ port = serverSocket.getLocalPort();
+ }
+ server = new RedisServer(port);
+ server.start();
+ client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
}
- private ArrayList<KV<String, String>> ingestData(String prefix, int numKeys)
{
- ArrayList<KV<String, String>> data = new ArrayList<>();
- for (int i = 0; i < numKeys; i++) {
- KV<String, String> kv = KV.of(prefix + "-key " + i, "value " + i);
- data.add(kv);
- }
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
- writePipeline.run();
- return data;
+ @AfterClass
+ public static void afterClass() {
+ client.close();
+ server.stop();
}
@Test
- public void testBulkRead() throws Exception {
- ArrayList<KV<String, String>> data = ingestData("bulkread", 100);
+ public void testRead() {
+ List<KV<String, String>> data = buildIncrementalData("bulkread", 10);
+ data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
PCollection<KV<String, String>> read =
- readPipeline.apply(
+ p.apply(
"Read",
RedisIO.read()
- .withEndpoint("::1", embeddedRedis.getPort())
+ .withEndpoint(REDIS_HOST, port)
.withKeyPattern("bulkread*")
.withBatchSize(10));
PAssert.that(read).containsInAnyOrder(data);
- readPipeline.run();
+ p.run();
}
@Test
- public void testWriteReadUsingDefaultAppendMethod() throws Exception {
- ArrayList<KV<String, String>> data = new ArrayList<>();
- for (int i = 0; i < 8000; i++) {
- KV<String, String> kv = KV.of("key " + i, "value " + i);
- data.add(kv);
- }
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()));
-
- writePipeline.run();
+ public void testReadWithKeyPattern() {
+ List<KV<String, String>> data = buildIncrementalData("pattern", 10);
+ data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
PCollection<KV<String, String>> read =
- readPipeline.apply(
- "Read",
- RedisIO.read()
- .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
- .withKeyPattern("key*"));
+ p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST,
port).withKeyPattern("pattern*"));
PAssert.that(read).containsInAnyOrder(data);
PCollection<KV<String, String>> readNotMatch =
- readPipeline.apply(
+ p.apply(
"ReadNotMatch",
- RedisIO.read()
- .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
- .withKeyPattern("foobar*"));
+ RedisIO.read().withEndpoint(REDIS_HOST,
port).withKeyPattern("foobar*"));
PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L);
- readPipeline.run();
+ p.run();
}
@Test
- public void testConfiguration() {
- RedisIO.Write writeOp = RedisIO.write().withEndpoint("test", 111);
- Assert.assertEquals(111, writeOp.connectionConfiguration().port());
- Assert.assertEquals("test", writeOp.connectionConfiguration().host());
- }
+ public void testWriteWithMethodSet() {
+ String key = "testWriteWithMethodSet";
+ client.set(key, "value");
- @Test
- public void testWriteReadUsingSetMethod() throws Exception {
- String key = "key";
- String value = "value";
String newValue = "newValue";
+ PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
+ write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.SET));
+ p.run();
- Jedis jedis =
- RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
- jedis.set(key, value);
-
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(KV.of(key, newValue)));
- write.apply(
- RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.SET));
-
- writePipeline.run();
-
- PCollection<KV<String, String>> read =
- readPipeline.apply(
- "Read",
- RedisIO.read().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withKeyPattern(key));
- PAssert.that(read).containsInAnyOrder(Collections.singletonList(KV.of(key,
newValue)));
-
- readPipeline.run();
+ assertEquals(newValue, client.get(key));
}
@Test
- public void testWriteReadUsingLpushMethod() throws Exception {
- String key = "key";
+ public void testWriteWithMethodLPush() {
+ String key = "testWriteWithMethodLPush";
String value = "value";
- String newValue = "newValue";
-
- Jedis jedis =
- RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
- jedis.lpush(key, value);
-
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(KV.of(key, newValue)));
- write.apply(
- RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.LPUSH));
+ client.lpush(key, value);
- writePipeline.run();
+ String newValue = "newValue";
+ PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
+ write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.LPUSH));
+ p.run();
- List<String> values = jedis.lrange(key, 0, -1);
- Assert.assertEquals(newValue + value, String.join("", values));
+ List<String> values = client.lrange(key, 0, -1);
+ assertEquals(newValue + value, String.join("", values));
}
@Test
- public void testWriteReadUsingRpushMethod() throws Exception {
- String key = "key";
+ public void testWriteWithMethodRPush() {
+ String key = "testWriteWithMethodRPush";
String value = "value";
- String newValue = "newValue";
-
- Jedis jedis =
- RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
- jedis.lpush(key, value);
+ client.lpush(key, value);
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(KV.of(key, newValue)));
- write.apply(
- RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.RPUSH));
-
- writePipeline.run();
+ String newValue = "newValue";
+ PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
+ write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.RPUSH));
+ p.run();
- List<String> values = jedis.lrange(key, 0, -1);
- Assert.assertEquals(value + newValue, String.join("", values));
+ List<String> values = client.lrange(key, 0, -1);
+ assertEquals(value + newValue, String.join("", values));
}
@Test
- public void testWriteReadUsingSaddMethod() throws Exception {
- String key = "key";
-
- Jedis jedis =
- RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
-
+ public void testWriteWithMethodSAdd() {
+ String key = "testWriteWithMethodSAdd";
List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0",
"5");
- List<KV<String, String>> kvs = Lists.newArrayList();
- for (String value : values) {
- kvs.add(KV.of(key, value));
- }
+ List<KV<String, String>> data = buildConstantKeyList(key, values);
- PCollection<KV<String, String>> write =
writePipeline.apply(Create.of(kvs));
- write.apply(
- RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.SADD));
+ PCollection<KV<String, String>> write = p.apply(Create.of(data));
+ write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.SADD));
+ p.run();
- writePipeline.run();
-
- Set<String> expected = Sets.newHashSet(values);
- Set<String> members = jedis.smembers(key);
- Assert.assertEquals(expected, members);
+ Set<String> expected = new HashSet<>(values);
+ Set<String> members = client.smembers(key);
+ assertEquals(expected, members);
}
@Test
- public void testWriteUsingHLLMethod() throws Exception {
- String key = "key";
-
- Jedis jedis =
- RedisConnectionConfiguration.create(REDIS_HOST,
embeddedRedis.getPort()).connect();
-
- PCollection<KV<String, String>> write =
- writePipeline.apply(
- Create.of(
- KV.of(key, "0"),
- KV.of(key, "1"),
- KV.of(key, "2"),
- KV.of(key, "3"),
- KV.of(key, "2"),
- KV.of(key, "4"),
- KV.of(key, "0"),
- KV.of(key, "5")));
-
- write.apply(
- RedisIO.write().withEndpoint(REDIS_HOST,
embeddedRedis.getPort()).withMethod(Method.PFADD));
-
- writePipeline.run();
-
- long count = jedis.pfcount(key);
- Assert.assertEquals(6, count);
- }
+ public void testWriteWithMethodPFAdd() {
+ String key = "testWriteWithMethodPFAdd";
+ List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0",
"5");
+ List<KV<String, String>> data = buildConstantKeyList(key, values);
- @Test
- public void testReadBuildsCorrectly() {
- RedisIO.Read read = RedisIO.read().withEndpoint("test",
111).withAuth("pass").withTimeout(5);
- Assert.assertEquals("test", read.connectionConfiguration().host());
- Assert.assertEquals(111, read.connectionConfiguration().port());
- Assert.assertEquals("pass", read.connectionConfiguration().auth());
- Assert.assertEquals(5, read.connectionConfiguration().timeout());
- Assert.assertEquals(false, read.connectionConfiguration().ssl());
- }
+ PCollection<KV<String, String>> write = p.apply(Create.of(data));
+ write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.PFADD));
+ p.run();
- @Test
- public void testWriteBuildsCorrectly() {
- RedisIO.Write write = RedisIO.write().withEndpoint("test",
111).withAuth("pass").withTimeout(5);
- Assert.assertEquals("test", write.connectionConfiguration().host());
- Assert.assertEquals(111, write.connectionConfiguration().port());
- Assert.assertEquals("pass", write.connectionConfiguration().auth());
- Assert.assertEquals(5, write.connectionConfiguration().timeout());
- Assert.assertEquals(false, write.connectionConfiguration().ssl());
- Assert.assertEquals(Method.APPEND, write.method());
+ long count = client.pfcount(key);
+ assertEquals(6, count);
}
- /** Simple embedded Redis instance wrapper to control Redis server. */
- private static class EmbeddedRedis implements AutoCloseable {
-
- private final int port;
- private final RedisServer redisServer;
-
- public EmbeddedRedis() throws IOException {
- try (ServerSocket serverSocket = new ServerSocket(0)) {
- port = serverSocket.getLocalPort();
- }
- redisServer = new RedisServer(port);
- redisServer.start();
- }
-
- public int getPort() {
- return this.port;
+ private static List<KV<String, String>> buildConstantKeyList(String key,
List<String> values) {
+ List<KV<String, String>> data = new ArrayList<>();
+ for (String value : values) {
+ data.add(KV.of(key, value));
}
+ return data;
+ }
- @Override
- public void close() {
- redisServer.stop();
+ private List<KV<String, String>> buildIncrementalData(String keyPrefix, int
size) {
+ List<KV<String, String>> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ data.add(KV.of(keyPrefix + i, String.valueOf(i)));
}
+ return data;
}
}