[ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=125251&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125251
 ]

ASF GitHub Bot logged work on BEAM-3446:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jul/18 21:25
            Start Date: 19/Jul/18 21:25
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r203876000
 
 

 ##########
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##########
 @@ -279,28 +314,73 @@ public void processElement(ProcessContext 
processContext) throws Exception {
       while (!finished) {
         ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
         List<String> keys = scanResult.getResult();
-
-        Pipeline pipeline = jedis.pipelined();
-        if (keys != null) {
-          for (String key : keys) {
-            pipeline.get(key);
-          }
-          List<Object> values = pipeline.syncAndReturnAll();
-          for (int i = 0; i < values.size(); i++) {
-            processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-          }
+        for (String k : keys) {
+          processContext.output(k);
         }
-
         cursor = scanResult.getStringCursor();
         if ("0".equals(cursor)) {
           finished = true;
         }
       }
     }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn<KV<String, String>> {
+    private int batchSize;
+    private List<String> bufferedKeys;
+    BoundedWindow window;
+    Instant lastMsg;
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      bufferedKeys = new ArrayList<>();
+    }
 
-    @Teardown
-    public void teardown() {
-      jedis.close();
+    ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+      super(connectionConfiguration);
+      this.batchSize = batchSize;
+    }
+
+    private int getBatchSize() {
+      return batchSize;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext processContext, BoundedWindow 
window)
+        throws Exception {
+      String key = processContext.element();
+      bufferedKeys.add(key);
+      this.window = window;
+      this.lastMsg = processContext.timestamp();
+      if (bufferedKeys.size() > getBatchSize()) {
+        List<KV<String, String>> kvs = fetchAndFlush();
+        for (KV<String, String> kv : kvs) {
+          processContext.output(kv);
+        }
+      }
+    }
+
+    private List<KV<String, String>> fetchAndFlush() {
+      String[] keys = new String[bufferedKeys.size()];
+      bufferedKeys.toArray(keys);
+      List<String> results = jedis.mget(keys);
+      assert bufferedKeys.size() == results.size();
+      List<KV<String, String>> kvs = new ArrayList<>(bufferedKeys.size());
+      for (int i = 0; i < bufferedKeys.size(); i++) {
+        if (results.get(i) != null) {
+          kvs.add(KV.of(bufferedKeys.get(i), results.get(i)));
+        }
+      }
+      bufferedKeys = new ArrayList<>();
+      return kvs;
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      List<KV<String, String>> kvs = fetchAndFlush();
+      for (KV<String, String> kv : kvs) {
+        context.output(kv, lastMsg, window);
 
 Review comment:
   Oh so silly of me I have misread the motivation on keeping the window, you 
are right, it makes total sense, in that case probably it is a better idea to 
store the elements in a Map with the window as key and the list of elements and 
use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough 
elements, Similar to what is done here (but with the count logic): 
   
https://github.com/apache/beam/blob/70b653187d566da7eea2590f17a36bbf22ef8bed/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L825-L844

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 125251)
    Time Spent: 3.5h  (was: 3h 20m)

> RedisIO non-prefix read operations
> ----------------------------------
>
>                 Key: BEAM-3446
>                 URL: https://issues.apache.org/jira/browse/BEAM-3446
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-redis
>            Reporter: Vinay varma
>            Assignee: Vinay varma
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
>     processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to