[ 
https://issues.apache.org/jira/browse/BEAM-4571?focusedWorklogId=134134&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134134
 ]

ASF GitHub Bot logged work on BEAM-4571:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Aug/18 12:59
            Start Date: 13/Aug/18 12:59
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #6045: [BEAM-4571] RedisIO 
support for write using SET operation
URL: https://github.com/apache/beam/pull/6045
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java 
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 9e040c1df79..279ca46db2b 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -126,6 +126,7 @@ public static ReadAll readAll() {
   public static Write write() {
     return new AutoValue_RedisIO_Write.Builder()
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .setMethod(Write.Method.APPEND)
         .build();
   }
 
@@ -334,9 +335,42 @@ public void processElement(ProcessContext context) {
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<KV<String, 
String>>, PDone> {
 
+    /** Determines the method used to insert data in Redis. */
+    public enum Method {
+
+      /**
+       * Use APPEND command. If key already exists and is a string, this 
command appends the value
+       * at the end of the string.
+       */
+      APPEND,
+
+      /** Use SET command. If key already holds a value, it is overwritten. */
+      SET,
+
+      /**
+       * Use LPUSH command. Insert value at the head of the list stored at 
key. If key does not
+       * exist, it is created as empty list before performing the push 
operations. When key holds a
+       * value that is not a list, an error is returned.
+       */
+      LPUSH,
+
+      /**
+       * Use RPUSH command. Insert value at the tail of the list stored at 
key. If key does not
+       * exist, it is created as empty list before performing the push 
operations. When key holds a
+       * value that is not a list, an error is returned.
+       */
+      RPUSH
+    }
+
     @Nullable
     abstract RedisConnectionConfiguration connectionConfiguration();
 
+    @Nullable
+    abstract Method method();
+
+    @Nullable
+    abstract Long expireTime();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -345,6 +379,10 @@ public void processElement(ProcessContext context) {
       abstract Builder setConnectionConfiguration(
           RedisConnectionConfiguration connectionConfiguration);
 
+      abstract Builder setMethod(Method method);
+
+      abstract Builder setExpireTime(Long expireTimeMillis);
+
       abstract Write build();
     }
 
@@ -373,6 +411,17 @@ public Write 
withConnectionConfiguration(RedisConnectionConfiguration connection
       return builder().setConnectionConfiguration(connection).build();
     }
 
+    public Write withMethod(Method method) {
+      checkArgument(method != null, "method can not be null");
+      return builder().setMethod(method).build();
+    }
+
+    public Write withExpireTime(Long expireTimeMillis) {
+      checkArgument(expireTimeMillis != null, "expireTimeMillis can not be 
null");
+      checkArgument(expireTimeMillis > 0, "expireTimeMillis can not be 
negative or 0");
+      return builder().setExpireTime(expireTimeMillis).build();
+    }
+
     @Override
     public PDone expand(PCollection<KV<String, String>> input) {
       checkArgument(connectionConfiguration() != null, 
"withConnectionConfiguration() is required");
@@ -411,7 +460,8 @@ public void startBundle() {
       @ProcessElement
       public void processElement(ProcessContext processContext) {
         KV<String, String> record = processContext.element();
-        pipeline.append(record.getKey(), record.getValue());
+
+        writeRecord(record);
 
         batchCount++;
 
@@ -421,6 +471,60 @@ public void processElement(ProcessContext processContext) {
         }
       }
 
+      private void writeRecord(KV<String, String> record) {
+        Method method = spec.method();
+        Long expireTime = spec.expireTime();
+
+        if (Method.APPEND == method) {
+          writeUsingAppendCommand(record, expireTime);
+        } else if (Method.SET == method) {
+          writeUsingSetCommand(record, expireTime);
+        } else if (Method.LPUSH == method || Method.RPUSH == method) {
+          writeUsingListCommand(record, method, expireTime);
+        }
+      }
+
+      private void writeUsingAppendCommand(KV<String, String> record, Long 
expireTime) {
+        String key = record.getKey();
+        String value = record.getValue();
+
+        pipeline.append(key, value);
+
+        setExpireTimeWhenRequired(key, expireTime);
+      }
+
+      private void writeUsingSetCommand(KV<String, String> record, Long 
expireTime) {
+        String key = record.getKey();
+        String value = record.getValue();
+
+        if (expireTime != null) {
+          pipeline.psetex(key, expireTime, value);
+        } else {
+          pipeline.set(key, value);
+        }
+      }
+
+      private void writeUsingListCommand(
+          KV<String, String> record, Method method, Long expireTime) {
+
+        String key = record.getKey();
+        String value = record.getValue();
+
+        if (Method.LPUSH == method) {
+          pipeline.lpush(key, value);
+        } else if (Method.RPUSH == method) {
+          pipeline.rpush(key, value);
+        }
+
+        setExpireTimeWhenRequired(key, expireTime);
+      }
+
+      private void setExpireTimeWhenRequired(String key, Long expireTime) {
+        if (expireTime != null) {
+          pipeline.pexpire(key, expireTime);
+        }
+      }
+
       @FinishBundle
       public void finishBundle() {
         pipeline.exec();
diff --git 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index a659e68bb21..26dc07ed0fe 100644
--- 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -20,6 +20,9 @@
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -31,11 +34,14 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import redis.clients.jedis.Jedis;
 import redis.embedded.RedisServer;
 
 /** Test on the Redis IO. */
 public class RedisIOTest {
 
+  private static final String REDIS_HOST = "::1";
+
   @Rule public TestPipeline writePipeline = TestPipeline.create();
   @Rule public TestPipeline readPipeline = TestPipeline.create();
 
@@ -52,32 +58,101 @@ public void after() throws Exception {
   }
 
   @Test
-  public void testWriteRead() throws Exception {
+  public void testWriteReadUsingDefaultAppendMethod() throws Exception {
     ArrayList<KV<String, String>> data = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
       KV<String, String> kv = KV.of("key " + i, "value " + i);
       data.add(kv);
     }
     PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
+    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()));
 
     writePipeline.run();
 
     PCollection<KV<String, String>> read =
         readPipeline.apply(
             "Read",
-            RedisIO.read().withEndpoint("::1", 
embeddedRedis.getPort()).withKeyPattern("key*"));
+            RedisIO.read()
+                .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
+                .withKeyPattern("key*"));
     PAssert.that(read).containsInAnyOrder(data);
 
     PCollection<KV<String, String>> readNotMatch =
         readPipeline.apply(
             "ReadNotMatch",
-            RedisIO.read().withEndpoint("::1", 
embeddedRedis.getPort()).withKeyPattern("foobar*"));
+            RedisIO.read()
+                .withEndpoint(REDIS_HOST, embeddedRedis.getPort())
+                .withKeyPattern("foobar*"));
     PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L);
 
     readPipeline.run();
   }
 
+  @Test
+  public void testWriteReadUsingSetMethod() throws Exception {
+    String key = "key";
+    String value = "value";
+    String newValue = "newValue";
+
+    Jedis jedis =
+        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
+    jedis.set(key, value);
+
+    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
+    write.apply(
+        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.SET));
+
+    writePipeline.run();
+
+    PCollection<KV<String, String>> read =
+        readPipeline.apply(
+            "Read",
+            RedisIO.read().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withKeyPattern(key));
+    PAssert.that(read).containsInAnyOrder(Collections.singletonList(KV.of(key, 
newValue)));
+
+    readPipeline.run();
+  }
+
+  @Test
+  public void testWriteReadUsingLpushMethod() throws Exception {
+    String key = "key";
+    String value = "value";
+    String newValue = "newValue";
+
+    Jedis jedis =
+        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
+    jedis.lpush(key, value);
+
+    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
+    write.apply(
+        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.LPUSH));
+
+    writePipeline.run();
+
+    List<String> values = jedis.lrange(key, 0, -1);
+    Assert.assertEquals(newValue + value, String.join("", values));
+  }
+
+  @Test
+  public void testWriteReadUsingRpushMethod() throws Exception {
+    String key = "key";
+    String value = "value";
+    String newValue = "newValue";
+
+    Jedis jedis =
+        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
+    jedis.lpush(key, value);
+
+    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(KV.of(key, newValue)));
+    write.apply(
+        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.RPUSH));
+
+    writePipeline.run();
+
+    List<String> values = jedis.lrange(key, 0, -1);
+    Assert.assertEquals(value + newValue, String.join("", values));
+  }
+
   @Test
   public void testReadBuildsCorrectly() {
     RedisIO.Read read = RedisIO.read().withEndpoint("test", 
111).withAuth("pass").withTimeout(5);
@@ -94,6 +169,7 @@ public void testWriteBuildsCorrectly() {
     Assert.assertEquals(111, write.connectionConfiguration().port());
     Assert.assertEquals("pass", write.connectionConfiguration().auth());
     Assert.assertEquals(5, write.connectionConfiguration().timeout());
+    Assert.assertEquals(Method.APPEND, write.method());
   }
 
   /** Simple embedded Redis instance wrapper to control Redis server. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134134)
    Time Spent: 1h 40m  (was: 1.5h)

> RedisIO support for write using SET operation
> ---------------------------------------------
>
>                 Key: BEAM-4571
>                 URL: https://issues.apache.org/jira/browse/BEAM-4571
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-redis
>    Affects Versions: 2.4.0
>            Reporter: Jan Peuker
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>             Fix For: 2.7.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> At the moment, RedisIO only supports "append" operation when writing to 
> Redis. This improvement is to add support for "set" operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to