[ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145693
 ]

ASF GitHub Bot logged work on BEAM-3446:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 15:14
            Start Date: 19/Sep/18 15:14
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java 
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 279ca46db2b..57d0b77af1f 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -20,7 +20,10 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -34,6 +37,7 @@
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -109,6 +113,7 @@ public static Read read() {
     return new AutoValue_RedisIO_Read.Builder()
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
         .setKeyPattern("*")
+        .setBatchSize(1000)
         .build();
   }
 
@@ -119,6 +124,7 @@ public static Read read() {
   public static ReadAll readAll() {
     return new AutoValue_RedisIO_ReadAll.Builder()
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
+        .setBatchSize(1000)
         .build();
   }
 
@@ -142,6 +148,8 @@ private RedisIO() {}
     @Nullable
     abstract String keyPattern();
 
+    abstract int batchSize();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -152,6 +160,8 @@ private RedisIO() {}
       @Nullable
       abstract Builder setKeyPattern(String keyPattern);
 
+      abstract Builder setBatchSize(int batchSize);
+
       abstract Read build();
     }
 
@@ -185,6 +195,10 @@ public Read 
withConnectionConfiguration(RedisConnectionConfiguration connection)
       return builder().setConnectionConfiguration(connection).build();
     }
 
+    public Read withBatchSize(int batchSize) {
+      return builder().setBatchSize(batchSize).build();
+    }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       connectionConfiguration().populateDisplayData(builder);
@@ -196,7 +210,11 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
       return input
           .apply(Create.of(keyPattern()))
-          
.apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
+          .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration())))
+          .apply(
+              RedisIO.readAll()
+                  .withConnectionConfiguration(connectionConfiguration())
+                  .withBatchSize(batchSize()));
     }
   }
 
@@ -208,6 +226,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     @Nullable
     abstract RedisConnectionConfiguration connectionConfiguration();
 
+    abstract int batchSize();
+
     abstract ReadAll.Builder builder();
 
     @AutoValue.Builder
@@ -215,6 +235,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       @Nullable
       abstract ReadAll.Builder 
setConnectionConfiguration(RedisConnectionConfiguration connection);
 
+      abstract ReadAll.Builder setBatchSize(int batchSize);
+
       abstract ReadAll build();
     }
 
@@ -243,25 +265,27 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
       return builder().setConnectionConfiguration(connection).build();
     }
 
+    public ReadAll withBatchSize(int batchSize) {
+      return builder().setBatchSize(batchSize).build();
+    }
+
     @Override
     public PCollection<KV<String, String>> expand(PCollection<String> input) {
       checkArgument(connectionConfiguration() != null, 
"withConnectionConfiguration() is required");
 
       return input
-          .apply(ParDo.of(new ReadFn(connectionConfiguration())))
+          .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
           .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
           .apply(new Reparallelize());
     }
   }
 
-  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
-  private static class ReadFn extends DoFn<String, KV<String, String>> {
-
-    private final RedisConnectionConfiguration connectionConfiguration;
+  abstract static class BaseReadFn<T> extends DoFn<String, T> {
+    protected final RedisConnectionConfiguration connectionConfiguration;
 
-    private transient Jedis jedis;
+    transient Jedis jedis;
 
-    public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+    BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
       this.connectionConfiguration = connectionConfiguration;
     }
 
@@ -270,6 +294,18 @@ public void setup() {
       jedis = connectionConfiguration.connect();
     }
 
+    @Teardown
+    public void teardown() {
+      jedis.close();
+    }
+  }
+
+  private static class ReadKeysWithPattern extends BaseReadFn<String> {
+
+    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
+      super(connectionConfiguration);
+    }
+
     @ProcessElement
     public void processElement(ProcessContext processContext) throws Exception 
{
       ScanParams scanParams = new ScanParams();
@@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) 
throws Exception {
       while (!finished) {
         ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
         List<String> keys = scanResult.getResult();
-
-        Pipeline pipeline = jedis.pipelined();
-        if (keys != null) {
-          for (String key : keys) {
-            pipeline.get(key);
-          }
-          List<Object> values = pipeline.syncAndReturnAll();
-          for (int i = 0; i < values.size(); i++) {
-            processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-          }
+        for (String k : keys) {
+          processContext.output(k);
         }
-
         cursor = scanResult.getStringCursor();
         if ("0".equals(cursor)) {
           finished = true;
         }
       }
     }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn<KV<String, String>> {
+    @Nullable transient Multimap<BoundedWindow, String> bundles = null;
+    @Nullable AtomicInteger batchCount = null;
+    private final int batchSize;
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      bundles = ArrayListMultimap.create();
+      batchCount = new AtomicInteger();
+    }
 
-    @Teardown
-    public void teardown() {
-      jedis.close();
+    ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+      super(connectionConfiguration);
+      this.batchSize = batchSize;
+    }
+
+    private int getBatchSize() {
+      return batchSize;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext processContext, BoundedWindow 
window)
+        throws Exception {
+      String key = processContext.element();
+      bundles.put(window, key);
+      if (batchCount.incrementAndGet() > getBatchSize()) {
+        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+        for (BoundedWindow w : kvs.keySet()) {
+          for (KV<String, String> kv : kvs.get(w)) {
+            processContext.output(kv);
+          }
+        }
+      }
+    }
+
+    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
+      Multimap<BoundedWindow, KV<String, String>> kvs = 
ArrayListMultimap.create();
+      for (BoundedWindow w : bundles.keySet()) {
+        String[] keys = new String[bundles.get(w).size()];
+        bundles.get(w).toArray(keys);
+        List<String> results = jedis.mget(keys);
+        for (int i = 0; i < results.size(); i++) {
+          if (results.get(i) != null) {
+            kvs.put(w, KV.of(keys[i], results.get(i)));
+          }
+        }
+      }
+      bundles = ArrayListMultimap.create();
+      batchCount.set(0);
+      return kvs;
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+      for (BoundedWindow w : kvs.keySet()) {
+        for (KV<String, String> kv : kvs.get(w)) {
+          context.output(kv, w.maxTimestamp(), w);
+        }
+      }
     }
   }
 
diff --git 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 26dc07ed0fe..3280ab4820d 100644
--- 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -57,6 +57,32 @@ public void after() throws Exception {
     embeddedRedis.close();
   }
 
+  private ArrayList<KV<String, String>> ingestData(String prefix, int numKeys) 
{
+    ArrayList<KV<String, String>> data = new ArrayList<>();
+    for (int i = 0; i < numKeys; i++) {
+      KV<String, String> kv = KV.of(prefix + "-key " + i, "value " + i);
+      data.add(kv);
+    }
+    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(data));
+    write.apply(RedisIO.write().withEndpoint("::1", embeddedRedis.getPort()));
+    writePipeline.run();
+    return data;
+  }
+
+  @Test
+  public void testBulkRead() throws Exception {
+    ArrayList<KV<String, String>> data = ingestData("bulkread", 100);
+    PCollection<KV<String, String>> read =
+        readPipeline.apply(
+            "Read",
+            RedisIO.read()
+                .withEndpoint("::1", embeddedRedis.getPort())
+                .withKeyPattern("bulkread*")
+                .withBatchSize(10));
+    PAssert.that(read).containsInAnyOrder(data);
+    readPipeline.run();
+  }
+
   @Test
   public void testWriteReadUsingDefaultAppendMethod() throws Exception {
     ArrayList<KV<String, String>> data = new ArrayList<>();
@@ -88,6 +114,13 @@ public void testWriteReadUsingDefaultAppendMethod() throws 
Exception {
     readPipeline.run();
   }
 
+  @Test
+  public void testConfiguration() {
+    RedisIO.Write writeOp = RedisIO.write().withEndpoint("test", 111);
+    Assert.assertEquals(111, writeOp.connectionConfiguration().port());
+    Assert.assertEquals("test", writeOp.connectionConfiguration().host());
+  }
+
   @Test
   public void testWriteReadUsingSetMethod() throws Exception {
     String key = "key";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145693)
    Time Spent: 6h 10m  (was: 6h)

> RedisIO non-prefix read operations
> ----------------------------------
>
>                 Key: BEAM-3446
>                 URL: https://issues.apache.org/jira/browse/BEAM-3446
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-redis
>            Reporter: Vinay varma
>            Assignee: Vinay varma
>            Priority: Major
>             Fix For: 2.8.0
>
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
>     processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to