This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ecc1220  [BEAM-7075] Create Redis embedded server on @BeforeClass and 
simplify tests
     new 0e9ab8d  Merge pull request #8304: [BEAM-7075] Create Redis embedded 
server on @BeforeClass and simplify tests
ecc1220 is described below

commit ecc122089ac18b2cb33d9aae1d31a4789fe4eec2
Author: Ismaël Mejía <ieme...@gmail.com>
AuthorDate: Mon Apr 15 00:43:13 2019 +0200

    [BEAM-7075] Create Redis embedded server on @BeforeClass and simplify tests
---
 .../org/apache/beam/sdk/io/redis/RedisIOTest.java  | 278 +++++++--------------
 1 file changed, 90 insertions(+), 188 deletions(-)

diff --git 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 585c238..320e515 100644
--- 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.sdk.io.redis;
 
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
@@ -31,11 +32,8 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
@@ -44,246 +42,150 @@ import redis.embedded.RedisServer;
 /** Test on the Redis IO. */
 public class RedisIOTest {
 
-  private static final String REDIS_HOST = "::1";
+  private static final String REDIS_HOST = "localhost";
 
-  @Rule public TestPipeline writePipeline = TestPipeline.create();
-  @Rule public TestPipeline readPipeline = TestPipeline.create();
+  @Rule public TestPipeline p = TestPipeline.create();
 
-  private EmbeddedRedis embeddedRedis;
+  private static RedisServer server;
+  private static int port;
 
-  @Before
-  public void before() throws Exception {
-    embeddedRedis = new EmbeddedRedis();
-  }
+  private static Jedis client;
 
-  @After
-  public void after() throws Exception {
-    embeddedRedis.close();
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    try (ServerSocket serverSocket = new ServerSocket(0)) {
+      port = serverSocket.getLocalPort();
+    }
+    server = new RedisServer(port);
+    server.start();
+    client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
   }
 
-  private ArrayList<KV<String, String>> ingestData(String prefix, int numKeys) 
{
-    ArrayList<KV<String, String>> data = new ArrayList<>();
-    for (int i = 0; i < numKeys; i++) {
-      KV<String, String> kv = KV.of(prefix + "-key " + i, "value " + i);
-      data.add(kv);
-    }
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
-    writePipeline.run();
-    return data;
+  @AfterClass
+  public static void afterClass() {
+    client.close();
+    server.stop();
   }
 
   @Test
-  public void testBulkRead() throws Exception {
-    ArrayList<KV<String, String>> data = ingestData("bulkread", 100);
+  public void testRead() {
+    List<KV<String, String>> data = buildIncrementalData("bulkread", 10);
+    data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
+
     PCollection<KV<String, String>> read =
-        readPipeline.apply(
+        p.apply(
             "Read",
             RedisIO.read()
-                .withEndpoint("::1", embeddedRedis.getPort())
+                .withEndpoint(REDIS_HOST, port)
                 .withKeyPattern("bulkread*")
                 .withBatchSize(10));
     PAssert.that(read).containsInAnyOrder(data);
-    readPipeline.run();
+    p.run();
   }
 
   @Test
-  public void testWriteReadUsingDefaultAppendMethod() throws Exception {
-    ArrayList<KV<String, String>> data = new ArrayList<>();
-    for (int i = 0; i < 8000; i++) {
-      KV<String, String> kv = KV.of("key " + i, "value " + i);
-      data.add(kv);
-    }
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()));
-
-    writePipeline.run();
+  public void testReadWithKeyPattern() {
+    List<KV<String, String>> data = buildIncrementalData("pattern", 10);
+    data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
 
     PCollection<KV<String, String>> read =
-        readPipeline.apply(
-            "Read",
-            RedisIO.read()
-                .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
-                .withKeyPattern("key*"));
+        p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, 
port).withKeyPattern("pattern*"));
     PAssert.that(read).containsInAnyOrder(data);
 
     PCollection<KV<String, String>> readNotMatch =
-        readPipeline.apply(
+        p.apply(
             "ReadNotMatch",
-            RedisIO.read()
-                .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
-                .withKeyPattern("foobar*"));
+            RedisIO.read().withEndpoint(REDIS_HOST, 
port).withKeyPattern("foobar*"));
     PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L);
 
-    readPipeline.run();
+    p.run();
   }
 
   @Test
-  public void testConfiguration() {
-    RedisIO.Write writeOp = RedisIO.write().withEndpoint("test", 111);
-    Assert.assertEquals(111, writeOp.connectionConfiguration().port());
-    Assert.assertEquals("test", writeOp.connectionConfiguration().host());
-  }
+  public void testWriteWithMethodSet() {
+    String key = "testWriteWithMethodSet";
+    client.set(key, "value");
 
-  @Test
-  public void testWriteReadUsingSetMethod() throws Exception {
-    String key = "key";
-    String value = "value";
     String newValue = "newValue";
+    PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.SET));
+    p.run();
 
-    Jedis jedis =
-        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
-    jedis.set(key, value);
-
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
-    write.apply(
-        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.SET));
-
-    writePipeline.run();
-
-    PCollection<KV<String, String>> read =
-        readPipeline.apply(
-            "Read",
-            RedisIO.read().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withKeyPattern(key));
-    PAssert.that(read).containsInAnyOrder(Collections.singletonList(KV.of(key, 
newValue)));
-
-    readPipeline.run();
+    assertEquals(newValue, client.get(key));
   }
 
   @Test
-  public void testWriteReadUsingLpushMethod() throws Exception {
-    String key = "key";
+  public void testWriteWithMethodLPush() {
+    String key = "testWriteWithMethodLPush";
     String value = "value";
-    String newValue = "newValue";
-
-    Jedis jedis =
-        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
-    jedis.lpush(key, value);
-
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
-    write.apply(
-        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.LPUSH));
+    client.lpush(key, value);
 
-    writePipeline.run();
+    String newValue = "newValue";
+    PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.LPUSH));
+    p.run();
 
-    List<String> values = jedis.lrange(key, 0, -1);
-    Assert.assertEquals(newValue + value, String.join("", values));
+    List<String> values = client.lrange(key, 0, -1);
+    assertEquals(newValue + value, String.join("", values));
   }
 
   @Test
-  public void testWriteReadUsingRpushMethod() throws Exception {
-    String key = "key";
+  public void testWriteWithMethodRPush() {
+    String key = "testWriteWithMethodRPush";
     String value = "value";
-    String newValue = "newValue";
-
-    Jedis jedis =
-        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
-    jedis.lpush(key, value);
+    client.lpush(key, value);
 
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
-    write.apply(
-        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.RPUSH));
-
-    writePipeline.run();
+    String newValue = "newValue";
+    PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.RPUSH));
+    p.run();
 
-    List<String> values = jedis.lrange(key, 0, -1);
-    Assert.assertEquals(value + newValue, String.join("", values));
+    List<String> values = client.lrange(key, 0, -1);
+    assertEquals(value + newValue, String.join("", values));
   }
 
   @Test
-  public void testWriteReadUsingSaddMethod() throws Exception {
-    String key = "key";
-
-    Jedis jedis =
-        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
-
+  public void testWriteWithMethodSAdd() {
+    String key = "testWriteWithMethodSAdd";
     List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", 
"5");
-    List<KV<String, String>> kvs = Lists.newArrayList();
-    for (String value : values) {
-      kvs.add(KV.of(key, value));
-    }
+    List<KV<String, String>> data = buildConstantKeyList(key, values);
 
-    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(kvs));
-    write.apply(
-        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.SADD));
+    PCollection<KV<String, String>> write = p.apply(Create.of(data));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.SADD));
+    p.run();
 
-    writePipeline.run();
-
-    Set<String> expected = Sets.newHashSet(values);
-    Set<String> members = jedis.smembers(key);
-    Assert.assertEquals(expected, members);
+    Set<String> expected = new HashSet<>(values);
+    Set<String> members = client.smembers(key);
+    assertEquals(expected, members);
   }
 
   @Test
-  public void testWriteUsingHLLMethod() throws Exception {
-    String key = "key";
-
-    Jedis jedis =
-        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
-
-    PCollection<KV<String, String>> write =
-        writePipeline.apply(
-            Create.of(
-                KV.of(key, "0"),
-                KV.of(key, "1"),
-                KV.of(key, "2"),
-                KV.of(key, "3"),
-                KV.of(key, "2"),
-                KV.of(key, "4"),
-                KV.of(key, "0"),
-                KV.of(key, "5")));
-
-    write.apply(
-        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.PFADD));
-
-    writePipeline.run();
-
-    long count = jedis.pfcount(key);
-    Assert.assertEquals(6, count);
-  }
+  public void testWriteWithMethodPFAdd() {
+    String key = "testWriteWithMethodPFAdd";
+    List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", 
"5");
+    List<KV<String, String>> data = buildConstantKeyList(key, values);
 
-  @Test
-  public void testReadBuildsCorrectly() {
-    RedisIO.Read read = RedisIO.read().withEndpoint("test", 
111).withAuth("pass").withTimeout(5);
-    Assert.assertEquals("test", read.connectionConfiguration().host());
-    Assert.assertEquals(111, read.connectionConfiguration().port());
-    Assert.assertEquals("pass", read.connectionConfiguration().auth());
-    Assert.assertEquals(5, read.connectionConfiguration().timeout());
-    Assert.assertEquals(false, read.connectionConfiguration().ssl());
-  }
+    PCollection<KV<String, String>> write = p.apply(Create.of(data));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.PFADD));
+    p.run();
 
-  @Test
-  public void testWriteBuildsCorrectly() {
-    RedisIO.Write write = RedisIO.write().withEndpoint("test", 
111).withAuth("pass").withTimeout(5);
-    Assert.assertEquals("test", write.connectionConfiguration().host());
-    Assert.assertEquals(111, write.connectionConfiguration().port());
-    Assert.assertEquals("pass", write.connectionConfiguration().auth());
-    Assert.assertEquals(5, write.connectionConfiguration().timeout());
-    Assert.assertEquals(false, write.connectionConfiguration().ssl());
-    Assert.assertEquals(Method.APPEND, write.method());
+    long count = client.pfcount(key);
+    assertEquals(6, count);
   }
 
-  /** Simple embedded Redis instance wrapper to control Redis server. */
-  private static class EmbeddedRedis implements AutoCloseable {
-
-    private final int port;
-    private final RedisServer redisServer;
-
-    public EmbeddedRedis() throws IOException {
-      try (ServerSocket serverSocket = new ServerSocket(0)) {
-        port = serverSocket.getLocalPort();
-      }
-      redisServer = new RedisServer(port);
-      redisServer.start();
-    }
-
-    public int getPort() {
-      return this.port;
+  private static List<KV<String, String>> buildConstantKeyList(String key, 
List<String> values) {
+    List<KV<String, String>> data = new ArrayList<>();
+    for (String value : values) {
+      data.add(KV.of(key, value));
     }
+    return data;
+  }
 
-    @Override
-    public void close() {
-      redisServer.stop();
+  private List<KV<String, String>> buildIncrementalData(String keyPrefix, int 
size) {
+    List<KV<String, String>> data = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      data.add(KV.of(keyPrefix + i, String.valueOf(i)));
     }
+    return data;
   }
 }

Reply via email to