This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ecc1220 [BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests new 0e9ab8d Merge pull request #8304: [BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests ecc1220 is described below commit ecc122089ac18b2cb33d9aae1d31a4789fe4eec2 Author: Ismaël Mejía <ieme...@gmail.com> AuthorDate: Mon Apr 15 00:43:13 2019 +0200 [BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests --- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 278 +++++++-------------- 1 file changed, 90 insertions(+), 188 deletions(-) diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java index 585c238..320e515 100644 --- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java +++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java @@ -17,11 +17,12 @@ */ package org.apache.beam.sdk.io.redis; -import java.io.IOException; +import static org.junit.Assert.assertEquals; + import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.beam.sdk.io.redis.RedisIO.Write.Method; @@ -31,11 +32,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -44,246 +42,150 @@ import redis.embedded.RedisServer; /** Test on the Redis IO. */ public class RedisIOTest { - private static final String REDIS_HOST = "::1"; + private static final String REDIS_HOST = "localhost"; - @Rule public TestPipeline writePipeline = TestPipeline.create(); - @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestPipeline p = TestPipeline.create(); - private EmbeddedRedis embeddedRedis; + private static RedisServer server; + private static int port; - @Before - public void before() throws Exception { - embeddedRedis = new EmbeddedRedis(); - } + private static Jedis client; - @After - public void after() throws Exception { - embeddedRedis.close(); + @BeforeClass + public static void beforeClass() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + port = serverSocket.getLocalPort(); + } + server = new RedisServer(port); + server.start(); + client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect(); } - private ArrayList<KV<String, String>> ingestData(String prefix, int numKeys) { - ArrayList<KV<String, String>> data = new ArrayList<>(); - for (int i = 0; i < numKeys; i++) { - KV<String, String> kv = KV.of(prefix + "-key " + i, "value " + i); - data.add(kv); - } - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(data)); - write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort())); - writePipeline.run(); - return data; + @AfterClass + public static void afterClass() { + client.close(); + server.stop(); } @Test - public void testBulkRead() throws Exception { - ArrayList<KV<String, String>> data = ingestData("bulkread", 100); + public void testRead() { + List<KV<String, String>> data = buildIncrementalData("bulkread", 10); + data.forEach(kv -> client.set(kv.getKey(), kv.getValue())); + PCollection<KV<String, String>> read = - readPipeline.apply( + p.apply( "Read", RedisIO.read() - .withEndpoint("::1", embeddedRedis.getPort()) + .withEndpoint(REDIS_HOST, port) .withKeyPattern("bulkread*") .withBatchSize(10)); PAssert.that(read).containsInAnyOrder(data); - readPipeline.run(); + p.run(); } @Test - public void testWriteReadUsingDefaultAppendMethod() throws Exception { - ArrayList<KV<String, String>> data = new ArrayList<>(); - for (int i = 0; i < 8000; i++) { - KV<String, String> kv = KV.of("key " + i, "value " + i); - data.add(kv); - } - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(data)); - write.apply(RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort())); - - writePipeline.run(); + public void testReadWithKeyPattern() { + List<KV<String, String>> data = buildIncrementalData("pattern", 10); + data.forEach(kv -> client.set(kv.getKey(), kv.getValue())); PCollection<KV<String, String>> read = - readPipeline.apply( - "Read", - RedisIO.read() - .withEndpoint(REDIS_HOST, embeddedRedis.getPort()) - .withKeyPattern("key*")); + p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("pattern*")); PAssert.that(read).containsInAnyOrder(data); PCollection<KV<String, String>> readNotMatch = - readPipeline.apply( + p.apply( "ReadNotMatch", - RedisIO.read() - .withEndpoint(REDIS_HOST, embeddedRedis.getPort()) - .withKeyPattern("foobar*")); + RedisIO.read().withEndpoint(REDIS_HOST, port).withKeyPattern("foobar*")); PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L); - readPipeline.run(); + p.run(); } @Test - public void testConfiguration() { - RedisIO.Write writeOp = RedisIO.write().withEndpoint("test", 111); - Assert.assertEquals(111, writeOp.connectionConfiguration().port()); - Assert.assertEquals("test", writeOp.connectionConfiguration().host()); - } + public void testWriteWithMethodSet() { + String key = "testWriteWithMethodSet"; + client.set(key, "value"); - @Test - public void testWriteReadUsingSetMethod() throws Exception { - String key = "key"; - String value = "value"; String newValue = "newValue"; + PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, newValue))); + write.apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(Method.SET)); + p.run(); - Jedis jedis = - RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); - jedis.set(key, value); - - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(KV.of(key, newValue))); - write.apply( - RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.SET)); - - writePipeline.run(); - - PCollection<KV<String, String>> read = - readPipeline.apply( - "Read", - RedisIO.read().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withKeyPattern(key)); - PAssert.that(read).containsInAnyOrder(Collections.singletonList(KV.of(key, newValue))); - - readPipeline.run(); + assertEquals(newValue, client.get(key)); } @Test - public void testWriteReadUsingLpushMethod() throws Exception { - String key = "key"; + public void testWriteWithMethodLPush() { + String key = "testWriteWithMethodLPush"; String value = "value"; - String newValue = "newValue"; - - Jedis jedis = - RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); - jedis.lpush(key, value); - - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(KV.of(key, newValue))); - write.apply( - RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.LPUSH)); + client.lpush(key, value); - writePipeline.run(); + String newValue = "newValue"; + PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, newValue))); + write.apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(Method.LPUSH)); + p.run(); - List<String> values = jedis.lrange(key, 0, -1); - Assert.assertEquals(newValue + value, String.join("", values)); + List<String> values = client.lrange(key, 0, -1); + assertEquals(newValue + value, String.join("", values)); } @Test - public void testWriteReadUsingRpushMethod() throws Exception { - String key = "key"; + public void testWriteWithMethodRPush() { + String key = "testWriteWithMethodRPush"; String value = "value"; - String newValue = "newValue"; - - Jedis jedis = - RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); - jedis.lpush(key, value); + client.lpush(key, value); - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(KV.of(key, newValue))); - write.apply( - RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.RPUSH)); - - writePipeline.run(); + String newValue = "newValue"; + PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, newValue))); + write.apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(Method.RPUSH)); + p.run(); - List<String> values = jedis.lrange(key, 0, -1); - Assert.assertEquals(value + newValue, String.join("", values)); + List<String> values = client.lrange(key, 0, -1); + assertEquals(value + newValue, String.join("", values)); } @Test - public void testWriteReadUsingSaddMethod() throws Exception { - String key = "key"; - - Jedis jedis = - RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); - + public void testWriteWithMethodSAdd() { + String key = "testWriteWithMethodSAdd"; List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5"); - List<KV<String, String>> kvs = Lists.newArrayList(); - for (String value : values) { - kvs.add(KV.of(key, value)); - } + List<KV<String, String>> data = buildConstantKeyList(key, values); - PCollection<KV<String, String>> write = writePipeline.apply(Create.of(kvs)); - write.apply( - RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.SADD)); + PCollection<KV<String, String>> write = p.apply(Create.of(data)); + write.apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(Method.SADD)); + p.run(); - writePipeline.run(); - - Set<String> expected = Sets.newHashSet(values); - Set<String> members = jedis.smembers(key); - Assert.assertEquals(expected, members); + Set<String> expected = new HashSet<>(values); + Set<String> members = client.smembers(key); + assertEquals(expected, members); } @Test - public void testWriteUsingHLLMethod() throws Exception { - String key = "key"; - - Jedis jedis = - RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); - - PCollection<KV<String, String>> write = - writePipeline.apply( - Create.of( - KV.of(key, "0"), - KV.of(key, "1"), - KV.of(key, "2"), - KV.of(key, "3"), - KV.of(key, "2"), - KV.of(key, "4"), - KV.of(key, "0"), - KV.of(key, "5"))); - - write.apply( - RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.PFADD)); - - writePipeline.run(); - - long count = jedis.pfcount(key); - Assert.assertEquals(6, count); - } + public void testWriteWithMethodPFAdd() { + String key = "testWriteWithMethodPFAdd"; + List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5"); + List<KV<String, String>> data = buildConstantKeyList(key, values); - @Test - public void testReadBuildsCorrectly() { - RedisIO.Read read = RedisIO.read().withEndpoint("test", 111).withAuth("pass").withTimeout(5); - Assert.assertEquals("test", read.connectionConfiguration().host()); - Assert.assertEquals(111, read.connectionConfiguration().port()); - Assert.assertEquals("pass", read.connectionConfiguration().auth()); - Assert.assertEquals(5, read.connectionConfiguration().timeout()); - Assert.assertEquals(false, read.connectionConfiguration().ssl()); - } + PCollection<KV<String, String>> write = p.apply(Create.of(data)); + write.apply(RedisIO.write().withEndpoint(REDIS_HOST, port).withMethod(Method.PFADD)); + p.run(); - @Test - public void testWriteBuildsCorrectly() { - RedisIO.Write write = RedisIO.write().withEndpoint("test", 111).withAuth("pass").withTimeout(5); - Assert.assertEquals("test", write.connectionConfiguration().host()); - Assert.assertEquals(111, write.connectionConfiguration().port()); - Assert.assertEquals("pass", write.connectionConfiguration().auth()); - Assert.assertEquals(5, write.connectionConfiguration().timeout()); - Assert.assertEquals(false, write.connectionConfiguration().ssl()); - Assert.assertEquals(Method.APPEND, write.method()); + long count = client.pfcount(key); + assertEquals(6, count); } - /** Simple embedded Redis instance wrapper to control Redis server. */ - private static class EmbeddedRedis implements AutoCloseable { - - private final int port; - private final RedisServer redisServer; - - public EmbeddedRedis() throws IOException { - try (ServerSocket serverSocket = new ServerSocket(0)) { - port = serverSocket.getLocalPort(); - } - redisServer = new RedisServer(port); - redisServer.start(); - } - - public int getPort() { - return this.port; + private static List<KV<String, String>> buildConstantKeyList(String key, List<String> values) { + List<KV<String, String>> data = new ArrayList<>(); + for (String value : values) { + data.add(KV.of(key, value)); } + return data; + } - @Override - public void close() { - redisServer.stop(); + private List<KV<String, String>> buildIncrementalData(String keyPrefix, int size) { + List<KV<String, String>> data = new ArrayList<>(); + for (int i = 0; i < size; i++) { + data.add(KV.of(keyPrefix + i, String.valueOf(i))); } + return data; } }